This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 35829fddcbc MINOR: Cleanups in ConfigurationUtils (#18576)
35829fddcbc is described below
commit 35829fddcbcf375eb0462d07a51bf8becb1b8757
Author: Luke Chen <[email protected]>
AuthorDate: Mon Jan 20 21:31:26 2025 +0900
MINOR: Cleanups in ConfigurationUtils (#18576)
Some clean up for ConfigurationUtils for v3.x.
Reviewers: Mickael Maison <[email protected]>
---
.../config/internals/BrokerSecurityConfigs.java | 3 +
.../secured/AccessTokenRetrieverFactory.java | 1 +
.../internals/secured/ConfigurationUtils.java | 22 ++++++
.../secured/VerificationKeyResolverFactory.java | 1 +
.../secured/AccessTokenRetrieverFactoryTest.java | 23 ++++++-
.../internals/secured/ConfigurationUtilsTest.java | 36 ++++++++++
.../VerificationKeyResolverFactoryTest.java | 80 ++++++++++++++++++++++
7 files changed, 164 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 0021c3d11ff..a538fa0b6ae 100644
---
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -130,6 +130,9 @@ public class BrokerSecurityConfigs {
public static final String SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG =
"sasl.mechanism.inter.broker.protocol";
public static final String SASL_MECHANISM_INTER_BROKER_PROTOCOL_DOC =
"SASL mechanism used for inter-broker communication. Default is GSSAPI.";
+
+ // The allowlist of the SASL OAUTHBEARER endpoints
+ public static final String ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG =
"org.apache.kafka.sasl.oauthbearer.allowed.urls";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
// General Security Configuration
.define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG,
LONG, BrokerSecurityConfigs.DEFAULT_CONNECTIONS_MAX_REAUTH_MS, MEDIUM,
BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactory.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactory.java
index 0ed4a1a2303..d471d647374 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactory.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactory.java
@@ -58,6 +58,7 @@ public class AccessTokenRetrieverFactory {
String saslMechanism,
Map<String, Object> jaasConfig) {
ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
+ cu.throwIfURLIsNotAllowed(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
URL tokenEndpointUrl =
cu.validateUrl(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
if
(tokenEndpointUrl.getProtocol().toLowerCase(Locale.ROOT).equals("file")) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
index 0be91cdfb5b..77882621b41 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
@@ -25,8 +25,13 @@ import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
/**
* <code>ConfigurationUtils</code> is a utility class to perform basic
configuration-related
@@ -228,4 +233,21 @@ public class ConfigurationUtils {
return (T) configs.get(name);
}
+ // make sure the url is in the
"org.apache.kafka.sasl.oauthbearer.allowed.urls" system property
+ public void throwIfURLIsNotAllowed(String urlConfig) {
+ String allowedUrlsProp =
System.getProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+ if (allowedUrlsProp == null) {
+ // by default, we accept all URLs
+ return;
+ }
+ Set<String> allowedUrlsList = Arrays.stream(allowedUrlsProp.split(","))
+ .map(String::trim)
+ .collect(Collectors.toSet());
+
+ String value = get(urlConfig);
+ if (!allowedUrlsList.contains(value)) {
+ throw new IllegalArgumentException(value + " is not allowed.
Update system property '"
+ + ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG + "' to allow " +
value);
+ }
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactory.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactory.java
index 0422045fc02..4f12b2724b5 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactory.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactory.java
@@ -58,6 +58,7 @@ public class VerificationKeyResolverFactory {
String saslMechanism,
Map<String, Object> jaasConfig) {
ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
+ cu.throwIfURLIsNotAllowed(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
URL jwksEndpointUrl =
cu.validateUrl(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
if
(jwksEndpointUrl.getProtocol().toLowerCase(Locale.ROOT).equals("file")) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
index 478e2baba1d..a34d71a3d53 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
@@ -19,6 +19,7 @@ package
org.apache.kafka.common.security.oauthbearer.internals.secured;
import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -32,10 +33,16 @@ import java.util.stream.Stream;
import static
org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class AccessTokenRetrieverFactoryTest extends OAuthBearerTest {
+ @AfterEach
+ public void tearDown() throws Exception {
+ System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+ }
+
@Test
public void testConfigureRefreshingFileAccessTokenRetriever() throws
Exception {
String expected = "{}";
@@ -55,14 +62,15 @@ public class AccessTokenRetrieverFactoryTest extends
OAuthBearerTest {
@Test
public void
testConfigureRefreshingFileAccessTokenRetrieverWithInvalidDirectory() {
// Should fail because the parent path doesn't exist.
- Map<String, ?> configs =
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, new
File("/tmp/this-directory-does-not-exist/foo.json").toURI().toString());
+ String file = new
File("/tmp/this-directory-does-not-exist/foo.json").toURI().toString();
+ Map<String, ?> configs =
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, file);
Map<String, Object> jaasConfig = Collections.emptyMap();
assertThrowsWithMessage(ConfigException.class, () ->
AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
}
@Test
public void
testConfigureRefreshingFileAccessTokenRetrieverWithInvalidFile() throws
Exception {
- // Should fail because the while the parent path exists, the file
itself doesn't.
+ // Should fail because while the parent path exists, the file itself
doesn't.
File tmpDir = createTempDir("this-directory-does-exist");
File accessTokenFile = new File(tmpDir,
"this-file-does-not-exist.json");
Map<String, ?> configs =
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL,
accessTokenFile.toURI().toString());
@@ -70,6 +78,17 @@ public class AccessTokenRetrieverFactoryTest extends
OAuthBearerTest {
assertThrowsWithMessage(ConfigException.class, () ->
AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
}
+ @Test
+ public void testSaslOauthbearerTokenEndpointUrlIsNotAllowed() throws
Exception {
+ // Should fail because the file is not in the allowed list
+ File tmpDir = createTempDir("not_allowed");
+ File accessTokenFile = new File(tmpDir, "not_allowed.json");
+ System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, "nothing");
+ Map<String, ?> configs =
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL,
accessTokenFile.toURI().toString());
+ assertThrowsWithMessage(IllegalArgumentException.class, () ->
AccessTokenRetrieverFactory.create(configs, Collections.emptyMap()),
+ ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+ }
+
@ParameterizedTest
@MethodSource("urlencodeHeaderSupplier")
public void testUrlencodeHeader(Map<String, Object> configs, boolean
expectedValue) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtilsTest.java
index e2a8ba135c0..0d3df183ae3 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtilsTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtilsTest.java
@@ -20,16 +20,27 @@ package
org.apache.kafka.common.security.oauthbearer.internals.secured;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import static
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
public class ConfigurationUtilsTest extends OAuthBearerTest {
private static final String URL_CONFIG_NAME = "url";
+ private static final String FILE_CONFIG_NAME = "file";
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+ }
@Test
public void testUrl() {
@@ -129,6 +140,31 @@ public class ConfigurationUtilsTest extends
OAuthBearerTest {
assertThrowsWithMessage(ConfigException.class, () -> testFile(" "),
"must not contain only whitespace");
}
+ @Test
+ public void testAllowedSaslOauthbearerUrlSystemProperty() {
+ String url = "http://www.example.com";
+ String fileUrl = "file:///etc/passwd";
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(URL_CONFIG_NAME, url);
+ configs.put(FILE_CONFIG_NAME, fileUrl);
+ ConfigurationUtils cu = new ConfigurationUtils(configs);
+
+ // By default, all URLs are allowed
+ assertDoesNotThrow(() -> cu.throwIfURLIsNotAllowed(URL_CONFIG_NAME));
+ assertDoesNotThrow(() -> cu.throwIfURLIsNotAllowed(FILE_CONFIG_NAME));
+
+ // add one url into allowed list
+ System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, url);
+ assertDoesNotThrow(() -> cu.throwIfURLIsNotAllowed(URL_CONFIG_NAME));
+ assertThrowsWithMessage(IllegalArgumentException.class, () ->
cu.throwIfURLIsNotAllowed(FILE_CONFIG_NAME),
+ ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+
+ // add all urls into allowed list
+ System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, url + "," +
fileUrl);
+ assertDoesNotThrow(() -> cu.throwIfURLIsNotAllowed(URL_CONFIG_NAME));
+ assertDoesNotThrow(() -> cu.throwIfURLIsNotAllowed(FILE_CONFIG_NAME));
+ }
+
protected void testFile(String value) {
Map<String, Object> configs =
Collections.singletonMap(URL_CONFIG_NAME, value);
ConfigurationUtils cu = new ConfigurationUtils(configs);
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactoryTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactoryTest.java
new file mode 100644
index 00000000000..0c4abd3cbe1
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Map;
+
+import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL;
+import static
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
+
+public class VerificationKeyResolverFactoryTest extends OAuthBearerTest {
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+ }
+
+ @Test
+ public void testConfigureRefreshingFileVerificationKeyResolver() throws
Exception {
+ File tmpDir = createTempDir("access-token");
+ File verificationKeyFile = createTempFile(tmpDir, "access-token-",
".json", "{}");
+
+ Map<String, ?> configs =
Collections.singletonMap(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL,
verificationKeyFile.toURI().toString());
+ Map<String, Object> jaasConfig = Collections.emptyMap();
+
+ // verify it won't throw exception
+ try (CloseableVerificationKeyResolver verificationKeyResolver =
VerificationKeyResolverFactory.create(configs, jaasConfig)) { }
+ }
+
+ @Test
+ public void
testConfigureRefreshingFileVerificationKeyResolverWithInvalidDirectory() {
+ // Should fail because the parent path doesn't exist.
+ String file = new
File("/tmp/this-directory-does-not-exist/foo.json").toURI().toString();
+ Map<String, ?> configs =
getSaslConfigs(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, file);
+ Map<String, Object> jaasConfig = Collections.emptyMap();
+ assertThrowsWithMessage(ConfigException.class, () ->
VerificationKeyResolverFactory.create(configs, jaasConfig), "that doesn't
exist");
+ }
+
+ @Test
+ public void
testConfigureRefreshingFileVerificationKeyResolverWithInvalidFile() throws
Exception {
+ // Should fail because while the parent path exists, the file itself
doesn't.
+ File tmpDir = createTempDir("this-directory-does-exist");
+ File verificationKeyFile = new File(tmpDir,
"this-file-does-not-exist.json");
+ Map<String, ?> configs =
getSaslConfigs(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL,
verificationKeyFile.toURI().toString());
+ Map<String, Object> jaasConfig = Collections.emptyMap();
+ assertThrowsWithMessage(ConfigException.class, () ->
VerificationKeyResolverFactory.create(configs, jaasConfig), "that doesn't
exist");
+ }
+
+ @Test
+ public void testSaslOauthbearerTokenEndpointUrlIsNotAllowed() throws
Exception {
+ // Should fail because the file is not in the allowed list
+ File tmpDir = createTempDir("not_allowed");
+ File verificationKeyFile = new File(tmpDir, "not_allowed.json");
+ System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, "nothing");
+ Map<String, ?> configs =
getSaslConfigs(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL,
verificationKeyFile.toURI().toString());
+ assertThrowsWithMessage(IllegalArgumentException.class, () ->
VerificationKeyResolverFactory.create(configs, Collections.emptyMap()),
+ ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+ }
+}