This is an automated email from the ASF dual-hosted git repository.

mbathori pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 864eba4af4 NIFI-7421 Added OAuth support to Kafka3ConnectionService
864eba4af4 is described below

commit 864eba4af44e7e250e97c471eab6c6b7546d2afc
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu May 29 18:17:29 2025 +0200

    NIFI-7421 Added OAuth support to Kafka3ConnectionService
    
    This closes #9979.
    
    Signed-off-by: Mark Bathori <[email protected]>
---
 .../nifi-kafka-service-aws/pom.xml                 |   4 +
 .../nifi-kafka-service-shared/pom.xml              |   4 +
 .../kafka/service/Kafka3ConnectionService.java     |  20 ++++
 .../security/OAuthBearerLoginCallbackHandler.java  | 117 +++++++++++++++++++++
 .../nifi-kafka-bundle/nifi-kafka-shared/pom.xml    |   4 +
 .../shared/component/KafkaClientComponent.java     |  10 ++
 .../login/DelegatingLoginConfigProvider.java       |   3 +-
 .../login/OAuthBearerLoginConfigProvider.java      |  50 +++++++++
 .../kafka/shared/property/KafkaClientProperty.java |   1 +
 .../nifi/kafka/shared/property/SaslMechanism.java  |   3 +-
 10 files changed, 214 insertions(+), 2 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
index b50076150f..551cfeeebb 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
@@ -43,6 +43,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kerberos-user-service-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-ssl-context-service-api</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/pom.xml 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/pom.xml
index 5975b0f431..c22a3d202e 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/pom.xml
@@ -44,6 +44,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kerberos-user-service-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 7059fe7fc8..3d7f1dfdf5 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -49,6 +49,7 @@ import 
org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
 import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
 import org.apache.nifi.kafka.service.consumer.Subscription;
 import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
+import org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler;
 import org.apache.nifi.kafka.shared.property.IsolationLevel;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
@@ -57,6 +58,7 @@ import 
org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
 import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
 import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
 
@@ -73,7 +75,10 @@ import java.util.regex.Pattern;
 
 import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
 import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
+import static 
org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler.PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER;
 import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_SERVICE_NAME;
+import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_LOGIN_CALLBACK_HANDLER_CLASS;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE;
@@ -236,6 +241,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
             SASL_MECHANISM,
             SASL_USERNAME,
             SASL_PASSWORD,
+            OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE,
             SELF_CONTAINED_KERBEROS_USER_SERVICE,
             KERBEROS_SERVICE_NAME,
             SSL_CONTEXT_SERVICE,
@@ -424,6 +430,8 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         final String configuredBootstrapServers = 
propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue();
         properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
configuredBootstrapServers);
 
+        setOAuthProperties(properties, propertyContext);
+
         setSslProperties(properties, propertyContext);
 
         final int defaultApiTimeoutMs = 
getDefaultApiTimeoutMs(propertyContext);
@@ -435,6 +443,18 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         return properties;
     }
 
+    private void setOAuthProperties(Properties properties, PropertyContext 
propertyContext) {
+        final SecurityProtocol securityProtocol = 
propertyContext.getProperty(SECURITY_PROTOCOL).asAllowableValue(SecurityProtocol.class);
+        if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || 
securityProtocol == SecurityProtocol.SASL_SSL) {
+            final SaslMechanism saslMechanism = 
propertyContext.getProperty(SASL_MECHANISM).asAllowableValue(SaslMechanism.class);
+            if (saslMechanism == SaslMechanism.OAUTHBEARER) {
+                
properties.put(SASL_LOGIN_CALLBACK_HANDLER_CLASS.getProperty(), 
OAuthBearerLoginCallbackHandler.class.getName());
+                final OAuth2AccessTokenProvider accessTokenProvider = 
propertyContext.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE).asControllerService(OAuth2AccessTokenProvider.class);
+                
properties.put(PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER, 
accessTokenProvider);
+            }
+        }
+    }
+
     private void setSslProperties(final Properties properties, final 
PropertyContext context) {
         final PropertyValue sslContextServiceProperty = 
context.getProperty(SSL_CONTEXT_SERVICE);
         if (sslContextServiceProperty.isSet()) {
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
new file mode 100644
index 0000000000..d4b926c336
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.security;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils;
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator;
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException;
+import org.apache.nifi.kafka.shared.login.OAuthBearerLoginConfigProvider;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME;
+
+/**
+ * {@link org.apache.kafka.common.security.auth.AuthenticateCallbackHandler} 
implementation to support OAuth 2 in NiFi Kafka components.
+ * It uses {@link org.apache.nifi.oauth2.OAuth2AccessTokenProvider} controller 
service to acquire Access Tokens. The service reference is injected via the 
Kafka configuration.
+ * The service identifier will be validated against the serviceId provided in 
the JAAS configuration to ensure consistency.
+ * For Access Token validation and parsing, the handler relies on the Kafka 
OAuth support classes. Only the token retrieval is NiFi specific.
+ */
+public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+    public static final String PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER 
= "nifi.oauth2.access.token.provider";
+
+    private OAuth2AccessTokenProvider accessTokenProvider;
+    private LoginAccessTokenValidator accessTokenValidator;
+
+    @Override
+    public void configure(final Map<String, ?> configs, final String 
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
+        final Map<String, Object> options = 
JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
+
+        final String serviceId = (String) 
options.get(OAuthBearerLoginConfigProvider.SERVICE_ID_KEY);
+        if (serviceId == null) {
+            throw new ProcessException(String.format("JAAS configuration must 
contain %s. [%s]", OAuthBearerLoginConfigProvider.SERVICE_ID_KEY, options));
+        }
+
+        final Object service = 
configs.get(PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER);
+        if (!(service instanceof OAuth2AccessTokenProvider)) {
+            throw new 
ProcessException(String.format("OAuth2AccessTokenProvider must be provided via 
%s property in Kafka configuration", 
PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER));
+        }
+
+        final OAuth2AccessTokenProvider accessTokenProvider = 
(OAuth2AccessTokenProvider) service;
+        if (!accessTokenProvider.getIdentifier().equals(serviceId)) {
+            throw new 
ProcessException(String.format("OAuth2AccessTokenProvider's identifier [%s] 
does not mach %s [%s] in JAAS configuration",
+                    accessTokenProvider.getIdentifier(), 
OAuthBearerLoginConfigProvider.SERVICE_ID_KEY, serviceId));
+        }
+
+        this.accessTokenProvider = accessTokenProvider;
+        this.accessTokenValidator = createAccessTokenValidator(configs, 
saslMechanism);
+    }
+
+    @Override
+    public void handle(final Callback[] callbacks) {
+        for (final Callback callback : callbacks) {
+            if (callback instanceof OAuthBearerTokenCallback) {
+                handleTokenCallback((OAuthBearerTokenCallback) callback);
+            }
+        }
+    }
+
+    private void handleTokenCallback(final OAuthBearerTokenCallback callback) {
+        final String accessToken;
+        try {
+            accessToken = 
accessTokenProvider.getAccessDetails().getAccessToken();
+        } catch (Exception e) {
+            LOGGER.error("Could not retrieve access token", e);
+            callback.error("service_error", e.getMessage(), null);
+            return;
+        }
+
+        try {
+            final OAuthBearerToken token = 
accessTokenValidator.validate(accessToken);
+            callback.token(token);
+        } catch (ValidateException e) {
+            LOGGER.error("Could not validate and parse access token", e);
+            callback.error("invalid_token", e.getMessage(), null);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private LoginAccessTokenValidator createAccessTokenValidator(final 
Map<String, ?> configs, final String saslMechanism) {
+        final ConfigurationUtils cu = new ConfigurationUtils(configs, 
saslMechanism);
+        final String scopeClaimName = 
cu.get(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
+        final String subClaimName = cu.get(SASL_OAUTHBEARER_SUB_CLAIM_NAME);
+        return new LoginAccessTokenValidator(scopeClaimName, subClaimName);
+    }
+}
diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/pom.xml 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/pom.xml
index 93701df9f2..4acb284b46 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/pom.xml
@@ -41,5 +41,9 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-security-kerberos-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index 1a3bc36b6f..d4567a476f 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -21,6 +21,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
 
@@ -141,4 +142,13 @@ public interface KafkaClientComponent {
             
.identifiesControllerService(SelfContainedKerberosUserService.class)
             .required(false)
             .build();
+
+    PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider-service")
+            .displayName("OAuth2 Access Token Provider Service")
+            .description("Service providing OAuth2 Access Tokens for 
authentication")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .dependsOn(SASL_MECHANISM, SaslMechanism.OAUTHBEARER)
+            .build();
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
index 8225225962..1d596544a5 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
@@ -33,7 +33,8 @@ public class DelegatingLoginConfigProvider implements 
LoginConfigProvider {
             SaslMechanism.PLAIN, new PlainLoginConfigProvider(),
             SaslMechanism.SCRAM_SHA_256, SCRAM_PROVIDER,
             SaslMechanism.SCRAM_SHA_512, SCRAM_PROVIDER,
-            SaslMechanism.AWS_MSK_IAM, new AwsMskIamLoginConfigProvider()
+            SaslMechanism.AWS_MSK_IAM, new AwsMskIamLoginConfigProvider(),
+            SaslMechanism.OAUTHBEARER, new OAuthBearerLoginConfigProvider()
     );
 
     /**
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
new file mode 100644
index 0000000000..f7df5b241a
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+
+import static 
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+
+/**
+ * SASL OAuthBearer Login Module implementation of configuration provider
+ */
+public class OAuthBearerLoginConfigProvider implements LoginConfigProvider {
+    private static final String MODULE_CLASS_NAME = 
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule";
+
+    public static final String SERVICE_ID_KEY = "serviceId";
+
+    /**
+     * Get JAAS configuration for activating OAuthBearer Login Module.
+     * The login module uses callback handlers to acquire Access Tokens. 
NiFi's callback handler relies on {@link 
org.apache.nifi.oauth2.OAuth2AccessTokenProvider} controller service to get the 
token.
+     * The controller service will be passed to the callback handler via Kafka 
config map (as an object, instead of the string-based JAAS config).
+     * The JAAS config contains the service id in order to make the callback 
handler unique to the given service (Kafka creates separate callback handlers 
based on JAAS config).
+     *
+     * @param context Property Context
+     * @return JAAS configuration with OAuthBearer Login Module
+     */
+    @Override
+    public String getConfiguration(final PropertyContext context) {
+        final LoginConfigBuilder builder = new 
LoginConfigBuilder(MODULE_CLASS_NAME, REQUIRED);
+
+        final String serviceId = 
context.getProperty(KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE).getValue();
+        builder.append(SERVICE_ID_KEY, serviceId);
+
+        return builder.build();
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
index 4769f0299d..4b41d3cc49 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
@@ -23,6 +23,7 @@ public enum KafkaClientProperty {
     SASL_JAAS_CONFIG("sasl.jaas.config"),
     SASL_LOGIN_CLASS("sasl.login.class"),
     SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"),
+    SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"),
 
     SSL_KEYSTORE_LOCATION("ssl.keystore.location"),
     SSL_KEYSTORE_PASSWORD("ssl.keystore.password"),
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
index 2aefb9e4e7..bfcf6204fb 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
@@ -30,7 +30,8 @@ public enum SaslMechanism implements DescribedValue {
     SCRAM_SHA_256("SCRAM-SHA-256", "SCRAM-SHA-256", "Salted Challenge Response 
Authentication Mechanism using SHA-512 with username and password"),
     SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response 
Authentication Mechanism using SHA-256 with username and password"),
     AWS_MSK_IAM("AWS_MSK_IAM", "AWS_MSK_IAM", "Allows to use AWS IAM for 
authentication and authorization against Amazon MSK clusters that have AWS IAM 
enabled " +
-            "as an authentication mechanism. The IAM credentials will be found 
using the AWS Default Credentials Provider Chain.");
+            "as an authentication mechanism. The IAM credentials will be found 
using the AWS Default Credentials Provider Chain."),
+    OAUTHBEARER("OAUTHBEARER", "OAUTHBEARER", "Token-based authentication 
using OAuth 2.0 access tokens.");
 
     private final String value;
     private final String displayName;

Reply via email to