This is an automated email from the ASF dual-hosted git repository.
mbathori pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 864eba4af4 NIFI-7421 Added OAuth support to Kafka3ConnectionService
864eba4af4 is described below
commit 864eba4af44e7e250e97c471eab6c6b7546d2afc
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu May 29 18:17:29 2025 +0200
NIFI-7421 Added OAuth support to Kafka3ConnectionService
This closes #9979.
Signed-off-by: Mark Bathori <[email protected]>
---
.../nifi-kafka-service-aws/pom.xml | 4 +
.../nifi-kafka-service-shared/pom.xml | 4 +
.../kafka/service/Kafka3ConnectionService.java | 20 ++++
.../security/OAuthBearerLoginCallbackHandler.java | 117 +++++++++++++++++++++
.../nifi-kafka-bundle/nifi-kafka-shared/pom.xml | 4 +
.../shared/component/KafkaClientComponent.java | 10 ++
.../login/DelegatingLoginConfigProvider.java | 3 +-
.../login/OAuthBearerLoginConfigProvider.java | 50 +++++++++
.../kafka/shared/property/KafkaClientProperty.java | 1 +
.../nifi/kafka/shared/property/SaslMechanism.java | 3 +-
10 files changed, 214 insertions(+), 2 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
index b50076150f..551cfeeebb 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
@@ -43,6 +43,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/pom.xml
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/pom.xml
index 5975b0f431..c22a3d202e 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/pom.xml
@@ -44,6 +44,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 7059fe7fc8..3d7f1dfdf5 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -49,6 +49,7 @@ import
org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
import org.apache.nifi.kafka.service.consumer.Subscription;
import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
+import org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler;
import org.apache.nifi.kafka.shared.property.IsolationLevel;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
@@ -57,6 +58,7 @@ import
org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@@ -73,7 +75,10 @@ import java.util.regex.Pattern;
import static
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
import static
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
+import static
org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler.PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_SERVICE_NAME;
+import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE;
+import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_LOGIN_CALLBACK_HANDLER_CLASS;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE;
@@ -236,6 +241,7 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
SASL_MECHANISM,
SASL_USERNAME,
SASL_PASSWORD,
+ OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE,
SELF_CONTAINED_KERBEROS_USER_SERVICE,
KERBEROS_SERVICE_NAME,
SSL_CONTEXT_SERVICE,
@@ -424,6 +430,8 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
final String configuredBootstrapServers =
propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
configuredBootstrapServers);
+ setOAuthProperties(properties, propertyContext);
+
setSslProperties(properties, propertyContext);
final int defaultApiTimeoutMs =
getDefaultApiTimeoutMs(propertyContext);
@@ -435,6 +443,18 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
return properties;
}
+ private void setOAuthProperties(Properties properties, PropertyContext
propertyContext) {
+ final SecurityProtocol securityProtocol =
propertyContext.getProperty(SECURITY_PROTOCOL).asAllowableValue(SecurityProtocol.class);
+ if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT ||
securityProtocol == SecurityProtocol.SASL_SSL) {
+ final SaslMechanism saslMechanism =
propertyContext.getProperty(SASL_MECHANISM).asAllowableValue(SaslMechanism.class);
+ if (saslMechanism == SaslMechanism.OAUTHBEARER) {
+
properties.put(SASL_LOGIN_CALLBACK_HANDLER_CLASS.getProperty(),
OAuthBearerLoginCallbackHandler.class.getName());
+ final OAuth2AccessTokenProvider accessTokenProvider =
propertyContext.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE).asControllerService(OAuth2AccessTokenProvider.class);
+
properties.put(PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER,
accessTokenProvider);
+ }
+ }
+ }
+
private void setSslProperties(final Properties properties, final
PropertyContext context) {
final PropertyValue sslContextServiceProperty =
context.getProperty(SSL_CONTEXT_SERVICE);
if (sslContextServiceProperty.isSet()) {
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
new file mode 100644
index 0000000000..d4b926c336
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.security;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import
org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
+import
org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils;
+import
org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator;
+import
org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException;
+import org.apache.nifi.kafka.shared.login.OAuthBearerLoginConfigProvider;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
+import static
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME;
+
+/**
+ * {@link org.apache.kafka.common.security.auth.AuthenticateCallbackHandler}
implementation to support OAuth 2 in NiFi Kafka components.
+ * It uses {@link org.apache.nifi.oauth2.OAuth2AccessTokenProvider} controller
service to acquire Access Tokens. The service reference is injected via the
Kafka configuration.
+ * The service identifier will be validated against the serviceId provided in
the JAAS configuration to ensure consistency.
+ * For Access Token validation and parsing, the handler relies on the Kafka
OAuth support classes. Only the token retrieval is NiFi specific.
+ */
+public class OAuthBearerLoginCallbackHandler implements
AuthenticateCallbackHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+ public static final String PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER
= "nifi.oauth2.access.token.provider";
+
+ private OAuth2AccessTokenProvider accessTokenProvider;
+ private LoginAccessTokenValidator accessTokenValidator;
+
+ @Override
+ public void configure(final Map<String, ?> configs, final String
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
+ final Map<String, Object> options =
JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
+
+ final String serviceId = (String)
options.get(OAuthBearerLoginConfigProvider.SERVICE_ID_KEY);
+ if (serviceId == null) {
+ throw new ProcessException(String.format("JAAS configuration must
contain %s. [%s]", OAuthBearerLoginConfigProvider.SERVICE_ID_KEY, options));
+ }
+
+ final Object service =
configs.get(PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER);
+ if (!(service instanceof OAuth2AccessTokenProvider)) {
+ throw new
ProcessException(String.format("OAuth2AccessTokenProvider must be provided via
%s property in Kafka configuration",
PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER));
+ }
+
+ final OAuth2AccessTokenProvider accessTokenProvider =
(OAuth2AccessTokenProvider) service;
+ if (!accessTokenProvider.getIdentifier().equals(serviceId)) {
+ throw new
ProcessException(String.format("OAuth2AccessTokenProvider's identifier [%s]
does not mach %s [%s] in JAAS configuration",
+ accessTokenProvider.getIdentifier(),
OAuthBearerLoginConfigProvider.SERVICE_ID_KEY, serviceId));
+ }
+
+ this.accessTokenProvider = accessTokenProvider;
+ this.accessTokenValidator = createAccessTokenValidator(configs,
saslMechanism);
+ }
+
+ @Override
+ public void handle(final Callback[] callbacks) {
+ for (final Callback callback : callbacks) {
+ if (callback instanceof OAuthBearerTokenCallback) {
+ handleTokenCallback((OAuthBearerTokenCallback) callback);
+ }
+ }
+ }
+
+ private void handleTokenCallback(final OAuthBearerTokenCallback callback) {
+ final String accessToken;
+ try {
+ accessToken =
accessTokenProvider.getAccessDetails().getAccessToken();
+ } catch (Exception e) {
+ LOGGER.error("Could not retrieve access token", e);
+ callback.error("service_error", e.getMessage(), null);
+ return;
+ }
+
+ try {
+ final OAuthBearerToken token =
accessTokenValidator.validate(accessToken);
+ callback.token(token);
+ } catch (ValidateException e) {
+ LOGGER.error("Could not validate and parse access token", e);
+ callback.error("invalid_token", e.getMessage(), null);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private LoginAccessTokenValidator createAccessTokenValidator(final
Map<String, ?> configs, final String saslMechanism) {
+ final ConfigurationUtils cu = new ConfigurationUtils(configs,
saslMechanism);
+ final String scopeClaimName =
cu.get(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
+ final String subClaimName = cu.get(SASL_OAUTHBEARER_SUB_CLAIM_NAME);
+ return new LoginAccessTokenValidator(scopeClaimName, subClaimName);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/pom.xml
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/pom.xml
index 93701df9f2..4acb284b46 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/pom.xml
@@ -41,5 +41,9 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index 1a3bc36b6f..d4567a476f 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -21,6 +21,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@@ -141,4 +142,13 @@ public interface KafkaClientComponent {
.identifiesControllerService(SelfContainedKerberosUserService.class)
.required(false)
.build();
+
+ PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("oauth2-access-token-provider-service")
+ .displayName("OAuth2 Access Token Provider Service")
+ .description("Service providing OAuth2 Access Tokens for
authentication")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .required(true)
+ .dependsOn(SASL_MECHANISM, SaslMechanism.OAUTHBEARER)
+ .build();
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
index 8225225962..1d596544a5 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
@@ -33,7 +33,8 @@ public class DelegatingLoginConfigProvider implements
LoginConfigProvider {
SaslMechanism.PLAIN, new PlainLoginConfigProvider(),
SaslMechanism.SCRAM_SHA_256, SCRAM_PROVIDER,
SaslMechanism.SCRAM_SHA_512, SCRAM_PROVIDER,
- SaslMechanism.AWS_MSK_IAM, new AwsMskIamLoginConfigProvider()
+ SaslMechanism.AWS_MSK_IAM, new AwsMskIamLoginConfigProvider(),
+ SaslMechanism.OAUTHBEARER, new OAuthBearerLoginConfigProvider()
);
/**
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
new file mode 100644
index 0000000000..f7df5b241a
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+
+import static
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+
+/**
+ * SASL OAuthBearer Login Module implementation of configuration provider
+ */
+public class OAuthBearerLoginConfigProvider implements LoginConfigProvider {
+ private static final String MODULE_CLASS_NAME =
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule";
+
+ public static final String SERVICE_ID_KEY = "serviceId";
+
+ /**
+ * Get JAAS configuration for activating OAuthBearer Login Module.
+ * The login module uses callback handlers to acquire Access Tokens.
NiFi's callback handler relies on {@link
org.apache.nifi.oauth2.OAuth2AccessTokenProvider} controller service to get the
token.
+ * The controller service will be passed to the callback handler via Kafka
config map (as an object, instead of the string-based JAAS config).
+ * The JAAS config contains the service id in order to make the callback
handler unique to the given service (Kafka creates separate callback handlers
based on JAAS config).
+ *
+ * @param context Property Context
+ * @return JAAS configuration with OAuthBearer Login Module
+ */
+ @Override
+ public String getConfiguration(final PropertyContext context) {
+ final LoginConfigBuilder builder = new
LoginConfigBuilder(MODULE_CLASS_NAME, REQUIRED);
+
+ final String serviceId =
context.getProperty(KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE).getValue();
+ builder.append(SERVICE_ID_KEY, serviceId);
+
+ return builder.build();
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
index 4769f0299d..4b41d3cc49 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
@@ -23,6 +23,7 @@ public enum KafkaClientProperty {
SASL_JAAS_CONFIG("sasl.jaas.config"),
SASL_LOGIN_CLASS("sasl.login.class"),
SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"),
+ SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"),
SSL_KEYSTORE_LOCATION("ssl.keystore.location"),
SSL_KEYSTORE_PASSWORD("ssl.keystore.password"),
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
index 2aefb9e4e7..bfcf6204fb 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
@@ -30,7 +30,8 @@ public enum SaslMechanism implements DescribedValue {
SCRAM_SHA_256("SCRAM-SHA-256", "SCRAM-SHA-256", "Salted Challenge Response
Authentication Mechanism using SHA-512 with username and password"),
SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response
Authentication Mechanism using SHA-256 with username and password"),
AWS_MSK_IAM("AWS_MSK_IAM", "AWS_MSK_IAM", "Allows to use AWS IAM for
authentication and authorization against Amazon MSK clusters that have AWS IAM
enabled " +
- "as an authentication mechanism. The IAM credentials will be found
using the AWS Default Credentials Provider Chain.");
+ "as an authentication mechanism. The IAM credentials will be found
using the AWS Default Credentials Provider Chain."),
+ OAUTHBEARER("OAUTHBEARER", "OAUTHBEARER", "Token-based authentication
using OAuth 2.0 access tokens.");
private final String value;
private final String displayName;