This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 46d3442528 NIFI-14782 Extended Kafka3ConnectionService OAuth
authentication with SASL Extensions support
46d3442528 is described below
commit 46d3442528a34e42d0266560ae0a2ade0bc0f929
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Nov 26 16:53:07 2025 +0100
NIFI-14782 Extended Kafka3ConnectionService OAuth authentication with SASL
Extensions support
Signed-off-by: Pierre Villard <[email protected]>
This closes #10567.
---
.../kafka/service/Kafka3ConnectionService.java | 28 ++++++++++++++---
.../security/OAuthBearerLoginCallbackHandler.java | 29 ++++++++++++++++-
.../login/OAuthBearerLoginConfigProvider.java | 8 ++++-
.../nifi/kafka/shared/util/SaslExtensionUtil.java | 36 ++++++++++++++++++++++
.../validation/DynamicPropertyValidator.java | 6 +++-
5 files changed, 99 insertions(+), 8 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 3291d6aa1f..b29a444932 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -84,12 +84,16 @@ import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEY_
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE;
+import static
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.SASL_EXTENSION_PROPERTY_PREFIX;
+import static
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
+import static
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.removeSaslExtensionPropertyPrefix;
@Tags({"Apache", "Kafka", "Message", "Publish", "Consume"})
-@DynamicProperty(name = "The name of a Kafka configuration property.", value =
"The value of a given Kafka configuration property.",
- description = "These properties will be added on the Kafka
configuration after loading any provided configuration properties."
+@DynamicProperty(name = "The name of a Kafka configuration property or a SASL
extension property.", value = "The value of the given property.",
+ description = "Kafka configuration properties will be added on the
Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that
was already set, its value will be ignored and WARN message logged."
- + " For the list of available Kafka properties please refer
to: http://kafka.apache.org/documentation.html#configuration.",
+ + " For the list of available Kafka properties please refer
to: http://kafka.apache.org/documentation.html#configuration."
+ + " SASL extension properties can be specified in " +
SASL_EXTENSION_PROPERTY_PREFIX + "propertyName format (e.g. " +
SASL_EXTENSION_PROPERTY_PREFIX + "logicalCluster).",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
@CapabilityDescription("Provides and manages connections to Kafka Brokers for
producer or consumer operations.")
public class Kafka3ConnectionService extends AbstractControllerService
implements KafkaConnectionService, VerifiableControllerService,
KafkaClientComponent {
@@ -214,12 +218,26 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ final String propertyName;
+ final String propertyType;
+ final ExpressionLanguageScope expressionLanguageScope;
+
+ if (isSaslExtensionProperty(propertyDescriptorName)) {
+ propertyName =
removeSaslExtensionPropertyPrefix(propertyDescriptorName);
+ propertyType = "SASL Extension";
+ expressionLanguageScope = ExpressionLanguageScope.NONE;
+ } else {
+ propertyName = propertyDescriptorName;
+ propertyType = "Kafka Configuration";
+ expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT;
+ }
+
return new PropertyDescriptor.Builder()
- .description("Specifies the value for '" +
propertyDescriptorName + "' Kafka Configuration.")
+ .description("Specifies the value for '%s' %s
property.".formatted(propertyName, propertyType))
.name(propertyDescriptorName)
.addValidator(new
DynamicPropertyValidator(ProducerConfig.class, ConsumerConfig.class))
.dynamic(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .expressionLanguageSupported(expressionLanguageScope)
.build();
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
index 69019eab85..4b4965d4aa 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
@@ -17,6 +17,8 @@
package org.apache.nifi.kafka.service.security;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
import org.apache.kafka.common.security.oauthbearer.ClientJwtValidator;
import org.apache.kafka.common.security.oauthbearer.JwtValidatorException;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
@@ -29,10 +31,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
+import static
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.removeSaslExtensionPropertyPrefix;
/**
* {@link org.apache.kafka.common.security.auth.AuthenticateCallbackHandler}
implementation to support OAuth 2 in NiFi Kafka components.
@@ -49,6 +57,8 @@ public class OAuthBearerLoginCallbackHandler implements
AuthenticateCallbackHand
private OAuth2AccessTokenProvider accessTokenProvider;
private ClientJwtValidator accessTokenValidator;
+ private Map<String, String> saslExtensions;
+
@Override
public void configure(final Map<String, ?> configs, final String
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
final Map<String, Object> options =
JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
@@ -72,13 +82,23 @@ public class OAuthBearerLoginCallbackHandler implements
AuthenticateCallbackHand
this.accessTokenProvider = accessTokenProvider;
this.accessTokenValidator = new ClientJwtValidator();
this.accessTokenValidator.configure(configs, saslMechanism, List.of());
+
+ this.saslExtensions = options.entrySet().stream()
+ .filter(entry -> isSaslExtensionProperty(entry.getKey()))
+ .collect(Collectors.collectingAndThen(
+ Collectors.toMap(entry ->
removeSaslExtensionPropertyPrefix(entry.getKey()), entry ->
entry.getValue().toString()),
+ Collections::unmodifiableMap));
}
@Override
- public void handle(final Callback[] callbacks) {
+ public void handle(final Callback[] callbacks) throws
UnsupportedCallbackException {
for (final Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback) {
handleTokenCallback((OAuthBearerTokenCallback) callback);
+ } else if (callback instanceof SaslExtensionsCallback) {
+ handleExtensionsCallback((SaslExtensionsCallback) callback);
+ } else {
+ throw new UnsupportedCallbackException(callback);
}
}
}
@@ -86,6 +106,8 @@ public class OAuthBearerLoginCallbackHandler implements
AuthenticateCallbackHand
private void handleTokenCallback(final OAuthBearerTokenCallback callback) {
final String accessToken;
try {
+ // Kafka's ExpiringCredentialRefreshingLogin calls this method
when the current token is about to expire and expects a refreshed token, so
forcefully update it
+ accessTokenProvider.refreshAccessDetails();
accessToken =
accessTokenProvider.getAccessDetails().getAccessToken();
} catch (Exception e) {
LOGGER.error("Could not retrieve access token", e);
@@ -102,6 +124,11 @@ public class OAuthBearerLoginCallbackHandler implements
AuthenticateCallbackHand
}
}
+ private void handleExtensionsCallback(final SaslExtensionsCallback
callback) {
+ // a unique SaslExtensions object must be returned otherwise it will
be lost upon relogin
+ callback.extensions(new SaslExtensions(saslExtensions));
+ }
+
@Override
public void close() {
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
index f7df5b241a..15fd34d1af 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
@@ -20,6 +20,7 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import static
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+import static
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
/**
* SASL OAuthBearer Login Module implementation of configuration provider
@@ -33,7 +34,8 @@ public class OAuthBearerLoginConfigProvider implements
LoginConfigProvider {
* Get JAAS configuration for activating OAuthBearer Login Module.
* The login module uses callback handlers to acquire Access Tokens.
NiFi's callback handler relies on {@link
org.apache.nifi.oauth2.OAuth2AccessTokenProvider} controller service to get the
token.
* The controller service will be passed to the callback handler via Kafka
config map (as an object, instead of the string-based JAAS config).
- * The JAAS config contains the service id in order to make the callback
handler unique to the given service (Kafka creates separate callback handlers
based on JAAS config).
+ * The JAAS config contains the service id and the SASL extension
properties in order to make the callback handler unique to the given
configuration
+ * (the Kafka framework creates separate callback handlers based on JAAS
config).
*
* @param context Property Context
* @return JAAS configuration with OAuthBearer Login Module
@@ -45,6 +47,10 @@ public class OAuthBearerLoginConfigProvider implements
LoginConfigProvider {
final String serviceId =
context.getProperty(KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE).getValue();
builder.append(SERVICE_ID_KEY, serviceId);
+ context.getAllProperties().entrySet().stream()
+ .filter(entry -> isSaslExtensionProperty(entry.getKey()))
+ .forEach(entry -> builder.append(entry.getKey(),
entry.getValue()));
+
return builder.build();
}
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java
new file mode 100644
index 0000000000..c87207f7a7
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.util;
+
+/**
+ * Utility class to handle SASL Extension properties
+ */
+public class SaslExtensionUtil {
+
+ public static final String SASL_EXTENSION_PROPERTY_PREFIX =
"sasl_extension_";
+
+ private SaslExtensionUtil() {
+ }
+
+ public static boolean isSaslExtensionProperty(final String propertyName) {
+ return propertyName.startsWith(SASL_EXTENSION_PROPERTY_PREFIX);
+ }
+
+ public static String removeSaslExtensionPropertyPrefix(final String
propertyName) {
+ return propertyName.substring(SASL_EXTENSION_PROPERTY_PREFIX.length());
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
index a9b0585dd7..63f1630c07 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
@@ -25,6 +25,8 @@ import
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyNameP
import java.util.HashSet;
import java.util.Set;
+import static
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.SASL_EXTENSION_PROPERTY_PREFIX;
+
/**
* Validator for dynamic Kafka properties
*/
@@ -48,10 +50,12 @@ public class DynamicPropertyValidator implements Validator {
if (subject.startsWith(PARTITIONS_PROPERTY_PREFIX)) {
builder.valid(true);
+ } else if (subject.startsWith(SASL_EXTENSION_PROPERTY_PREFIX)) {
+ builder.valid(true);
} else {
final boolean valid = clientPropertyNames.contains(subject);
builder.valid(valid);
- builder.explanation("must be a known Kafka client configuration
property");
+ builder.explanation("must be a known Kafka client configuration
property or a SASL extension property");
}
return builder.build();