This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 46d3442528 NIFI-14782 Extended Kafka3ConnectionService OAuth 
authentication with SASL Extensions support
46d3442528 is described below

commit 46d3442528a34e42d0266560ae0a2ade0bc0f929
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Nov 26 16:53:07 2025 +0100

    NIFI-14782 Extended Kafka3ConnectionService OAuth authentication with SASL 
Extensions support
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10567.
---
 .../kafka/service/Kafka3ConnectionService.java     | 28 ++++++++++++++---
 .../security/OAuthBearerLoginCallbackHandler.java  | 29 ++++++++++++++++-
 .../login/OAuthBearerLoginConfigProvider.java      |  8 ++++-
 .../nifi/kafka/shared/util/SaslExtensionUtil.java  | 36 ++++++++++++++++++++++
 .../validation/DynamicPropertyValidator.java       |  6 +++-
 5 files changed, 99 insertions(+), 8 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 3291d6aa1f..b29a444932 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -84,12 +84,16 @@ import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEY_
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE;
+import static 
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.SASL_EXTENSION_PROPERTY_PREFIX;
+import static 
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
+import static 
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.removeSaslExtensionPropertyPrefix;
 
 @Tags({"Apache", "Kafka", "Message", "Publish", "Consume"})
-@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
-        description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
+@DynamicProperty(name = "The name of a Kafka configuration property or a SASL 
extension property.", value = "The value of the given property.",
+        description = "Kafka configuration properties will be added on the 
Kafka configuration after loading any provided configuration properties."
                 + " In the event a dynamic property represents a property that 
was already set, its value will be ignored and WARN message logged."
-                + " For the list of available Kafka properties please refer 
to: http://kafka.apache.org/documentation.html#configuration.";,
+                + " For the list of available Kafka properties please refer 
to: http://kafka.apache.org/documentation.html#configuration.";
+                + " SASL extension properties can be specified in " + 
SASL_EXTENSION_PROPERTY_PREFIX + "propertyName format (e.g. " + 
SASL_EXTENSION_PROPERTY_PREFIX + "logicalCluster).",
         expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
 @CapabilityDescription("Provides and manages connections to Kafka Brokers for 
producer or consumer operations.")
 public class Kafka3ConnectionService extends AbstractControllerService 
implements KafkaConnectionService, VerifiableControllerService, 
KafkaClientComponent {
@@ -214,12 +218,26 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        final String propertyName;
+        final String propertyType;
+        final ExpressionLanguageScope expressionLanguageScope;
+
+        if (isSaslExtensionProperty(propertyDescriptorName)) {
+            propertyName = 
removeSaslExtensionPropertyPrefix(propertyDescriptorName);
+            propertyType = "SASL Extension";
+            expressionLanguageScope = ExpressionLanguageScope.NONE;
+        } else {
+            propertyName = propertyDescriptorName;
+            propertyType = "Kafka Configuration";
+            expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT;
+        }
+
         return new PropertyDescriptor.Builder()
-                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                .description("Specifies the value for '%s' %s 
property.".formatted(propertyName, propertyType))
                 .name(propertyDescriptorName)
                 .addValidator(new 
DynamicPropertyValidator(ProducerConfig.class, ConsumerConfig.class))
                 .dynamic(true)
-                
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+                .expressionLanguageSupported(expressionLanguageScope)
                 .build();
     }
 
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
index 69019eab85..4b4965d4aa 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.kafka.service.security;
 
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
 import org.apache.kafka.common.security.oauthbearer.ClientJwtValidator;
 import org.apache.kafka.common.security.oauthbearer.JwtValidatorException;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
@@ -29,10 +31,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.login.AppConfigurationEntry;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
+import static 
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.removeSaslExtensionPropertyPrefix;
 
 /**
  * {@link org.apache.kafka.common.security.auth.AuthenticateCallbackHandler} 
implementation to support OAuth 2 in NiFi Kafka components.
@@ -49,6 +57,8 @@ public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHand
     private OAuth2AccessTokenProvider accessTokenProvider;
     private ClientJwtValidator accessTokenValidator;
 
+    private Map<String, String> saslExtensions;
+
     @Override
     public void configure(final Map<String, ?> configs, final String 
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
         final Map<String, Object> options = 
JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
@@ -72,13 +82,23 @@ public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHand
         this.accessTokenProvider = accessTokenProvider;
         this.accessTokenValidator = new ClientJwtValidator();
         this.accessTokenValidator.configure(configs, saslMechanism, List.of());
+
+        this.saslExtensions = options.entrySet().stream()
+                .filter(entry -> isSaslExtensionProperty(entry.getKey()))
+                .collect(Collectors.collectingAndThen(
+                        Collectors.toMap(entry -> 
removeSaslExtensionPropertyPrefix(entry.getKey()), entry -> 
entry.getValue().toString()),
+                        Collections::unmodifiableMap));
     }
 
     @Override
-    public void handle(final Callback[] callbacks) {
+    public void handle(final Callback[] callbacks) throws 
UnsupportedCallbackException {
         for (final Callback callback : callbacks) {
             if (callback instanceof OAuthBearerTokenCallback) {
                 handleTokenCallback((OAuthBearerTokenCallback) callback);
+            } else if (callback instanceof SaslExtensionsCallback) {
+                handleExtensionsCallback((SaslExtensionsCallback) callback);
+            } else {
+                throw new UnsupportedCallbackException(callback);
             }
         }
     }
@@ -86,6 +106,8 @@ public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHand
     private void handleTokenCallback(final OAuthBearerTokenCallback callback) {
         final String accessToken;
         try {
+            // Kafka's ExpiringCredentialRefreshingLogin calls this method 
when the current token is about to expire and expects a refreshed token, so 
forcefully update it
+            accessTokenProvider.refreshAccessDetails();
             accessToken = 
accessTokenProvider.getAccessDetails().getAccessToken();
         } catch (Exception e) {
             LOGGER.error("Could not retrieve access token", e);
@@ -102,6 +124,11 @@ public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHand
         }
     }
 
+    private void handleExtensionsCallback(final SaslExtensionsCallback 
callback) {
+        // a unique SaslExtensions object must be returned otherwise it will 
be lost upon relogin
+        callback.extensions(new SaslExtensions(saslExtensions));
+    }
+
     @Override
     public void close() {
     }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
index f7df5b241a..15fd34d1af 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
@@ -20,6 +20,7 @@ import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
 
 import static 
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+import static 
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty;
 
 /**
  * SASL OAuthBearer Login Module implementation of configuration provider
@@ -33,7 +34,8 @@ public class OAuthBearerLoginConfigProvider implements 
LoginConfigProvider {
      * Get JAAS configuration for activating OAuthBearer Login Module.
      * The login module uses callback handlers to acquire Access Tokens. 
NiFi's callback handler relies on {@link 
org.apache.nifi.oauth2.OAuth2AccessTokenProvider} controller service to get the 
token.
      * The controller service will be passed to the callback handler via Kafka 
config map (as an object, instead of the string-based JAAS config).
-     * The JAAS config contains the service id in order to make the callback 
handler unique to the given service (Kafka creates separate callback handlers 
based on JAAS config).
+     * The JAAS config contains the service id and the SASL extension 
properties in order to make the callback handler unique to the given 
configuration
+     * (the Kafka framework creates separate callback handlers based on JAAS 
config).
      *
      * @param context Property Context
      * @return JAAS configuration with OAuthBearer Login Module
@@ -45,6 +47,10 @@ public class OAuthBearerLoginConfigProvider implements 
LoginConfigProvider {
         final String serviceId = 
context.getProperty(KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE).getValue();
         builder.append(SERVICE_ID_KEY, serviceId);
 
+        context.getAllProperties().entrySet().stream()
+                .filter(entry -> isSaslExtensionProperty(entry.getKey()))
+                .forEach(entry -> builder.append(entry.getKey(), 
entry.getValue()));
+
         return builder.build();
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java
new file mode 100644
index 0000000000..c87207f7a7
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.util;
+
+/**
+ * Utility class to handle SASL Extension properties
+ */
+public class SaslExtensionUtil {
+
+    public static final String SASL_EXTENSION_PROPERTY_PREFIX = 
"sasl_extension_";
+
+    private SaslExtensionUtil() {
+    }
+
+    public static boolean isSaslExtensionProperty(final String propertyName) {
+        return propertyName.startsWith(SASL_EXTENSION_PROPERTY_PREFIX);
+    }
+
+    public static String removeSaslExtensionPropertyPrefix(final String 
propertyName) {
+        return propertyName.substring(SASL_EXTENSION_PROPERTY_PREFIX.length());
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
index a9b0585dd7..63f1630c07 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java
@@ -25,6 +25,8 @@ import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyNameP
 import java.util.HashSet;
 import java.util.Set;
 
+import static 
org.apache.nifi.kafka.shared.util.SaslExtensionUtil.SASL_EXTENSION_PROPERTY_PREFIX;
+
 /**
  * Validator for dynamic Kafka properties
  */
@@ -48,10 +50,12 @@ public class DynamicPropertyValidator implements Validator {
 
         if (subject.startsWith(PARTITIONS_PROPERTY_PREFIX)) {
             builder.valid(true);
+        } else if (subject.startsWith(SASL_EXTENSION_PROPERTY_PREFIX)) {
+            builder.valid(true);
         } else {
             final boolean valid = clientPropertyNames.contains(subject);
             builder.valid(valid);
-            builder.explanation("must be a known Kafka client configuration 
property");
+            builder.explanation("must be a known Kafka client configuration 
property or a SASL extension property");
         }
 
         return builder.build();

Reply via email to