This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f3a0539d94 NIFI-15172 Add support for Web Identity to 
AmazonMSKConnectionService (#10489)
f3a0539d94 is described below

commit f3a0539d94735e7c51b64935188a385b1b18e6cb
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Nov 10 21:08:11 2025 +0100

    NIFI-15172 Add support for Web Identity to AmazonMSKConnectionService 
(#10489)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-kafka-service-aws/pom.xml                 |  10 +
 .../service/aws/AmazonMSKConnectionService.java    | 281 ++++++++++++++++++++-
 .../aws/AmazonMSKCredentialsCallbackHandler.java   |  93 +++++++
 .../aws/AmazonMSKConnectionServiceTest.java        |  86 +++++++
 .../AmazonMSKCredentialsCallbackHandlerTest.java   |  88 +++++++
 .../kafka/service/Kafka3ConnectionService.java     |   6 +-
 .../nifi/kafka/shared/aws/AmazonMSKProperty.java   |  35 +++
 .../shared/component/KafkaClientComponent.java     |  17 +-
 .../shared/login/AwsMskIamLoginConfigProvider.java |   2 +-
 .../nifi/kafka/shared/property/AwsRoleSource.java  |   3 +-
 .../provider/StandardKafkaPropertyProvider.java    |  11 +-
 .../login/AwsMskIamLoginConfigProviderTest.java    |  37 +++
 .../StandardKafkaPropertyProviderTest.java         |  38 +++
 13 files changed, 697 insertions(+), 10 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
index 2c1e7df587..f4585c257d 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
@@ -61,5 +61,15 @@
             <artifactId>aws-msk-iam-auth</artifactId>
             <version>2.3.5</version>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sts</artifactId>
+            <version>${software.amazon.awssdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+            <version>${software.amazon.awssdk.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
index 32236d120a..0f064b4a75 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
@@ -16,18 +16,48 @@
  */
 package org.apache.nifi.kafka.service.aws;
 
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.kafka.service.Kafka3ConnectionService;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
 import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
 import org.apache.nifi.kafka.shared.property.AwsRoleSource;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextProvider;
+import org.apache.nifi.util.StringUtils;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityRequest;
+import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+import javax.net.ssl.SSLContext;
 
+import java.net.URI;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 @Tags({"AWS", "MSK", "streaming", "kafka"})
 @CapabilityDescription("Provides and manages connections to AWS MSK Kafka 
Brokers for producer or consumer operations.")
@@ -42,6 +72,57 @@ public class AmazonMSKConnectionService extends 
Kafka3ConnectionService {
             .defaultValue(SaslMechanism.AWS_MSK_IAM)
             .build();
 
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER = 
KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
+
+    private static final int MIN_SESSION_DURATION_SECONDS = 900;
+    private static final int MAX_SESSION_DURATION_SECONDS = 3600;
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_SESSION_TIME = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity Session Time")
+            .description("Session time for AWS STS AssumeRoleWithWebIdentity 
(between 900 seconds and 3600 seconds).")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(true)
+            .defaultValue("%d sec".formatted(MAX_SESSION_DURATION_SECONDS))
+            .addValidator(StandardValidators.createTimePeriodValidator(
+                    MIN_SESSION_DURATION_SECONDS, TimeUnit.SECONDS, 
MAX_SESSION_DURATION_SECONDS, TimeUnit.SECONDS))
+            .build();
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_REGION = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity STS Region")
+            .description("Region identifier used for the AWS Security Token 
Service when exchanging Web Identity tokens.")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity STS Endpoint")
+            .description("Optional endpoint override for the AWS Security 
Token Service.")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor 
AWS_WEB_IDENTITY_SSL_CONTEXT_PROVIDER = new PropertyDescriptor.Builder()
+            .name("AWS Web Identity SSL Context Provider")
+            .description("SSL Context Service used when communicating with AWS 
STS for Web Identity federation.")
+            .identifiesControllerService(SSLContextProvider.class)
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .build();
+
     private final List<PropertyDescriptor> supportedPropertyDescriptors;
 
     public AmazonMSKConnectionService() {
@@ -58,6 +139,11 @@ public class AmazonMSKConnectionService extends 
Kafka3ConnectionService {
                 descriptors.add(KafkaClientComponent.AWS_PROFILE_NAME);
                 descriptors.add(KafkaClientComponent.AWS_ASSUME_ROLE_ARN);
                 
descriptors.add(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME);
+                descriptors.add(AWS_WEB_IDENTITY_TOKEN_PROVIDER);
+                descriptors.add(AWS_WEB_IDENTITY_SESSION_TIME);
+                descriptors.add(AWS_WEB_IDENTITY_STS_REGION);
+                descriptors.add(AWS_WEB_IDENTITY_STS_ENDPOINT);
+                descriptors.add(AWS_WEB_IDENTITY_SSL_CONTEXT_PROVIDER);
             }
         }
 
@@ -69,6 +155,199 @@ public class AmazonMSKConnectionService extends 
Kafka3ConnectionService {
         return supportedPropertyDescriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final AwsRoleSource roleSource = 
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+        if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+            if 
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+                results.add(new ValidationResult.Builder()
+                        
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+                        .valid(false)
+                        .explanation("AWS Web Identity Token Provider must be 
configured when AWS Role Source is set to Web Identity Provider")
+                        .build());
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    protected Properties getProducerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
+        final Properties properties = 
super.getProducerProperties(propertyContext, defaultProperties);
+        setAuthenticationProperties(properties, propertyContext);
+        return properties;
+    }
+
+    @Override
+    protected Properties getConsumerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
+        final Properties properties = 
super.getConsumerProperties(propertyContext, defaultProperties);
+        setAuthenticationProperties(properties, propertyContext);
+        return properties;
+    }
+
+    @Override
+    protected Properties getClientProperties(final PropertyContext 
propertyContext) {
+        final Properties properties = 
super.getClientProperties(propertyContext);
+        setAuthenticationProperties(properties, propertyContext);
+        return properties;
+    }
+
+    private void setAuthenticationProperties(final Properties properties, 
final PropertyContext propertyContext) {
+        final AwsRoleSource roleSource = 
propertyContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+        if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+            final AwsCredentialsProvider credentialsProvider = 
createWebIdentityCredentialsProvider(propertyContext);
+            
properties.put(AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty(),
 credentialsProvider);
+        }
+    }
+
+    private AwsCredentialsProvider createWebIdentityCredentialsProvider(final 
PropertyContext propertyContext) {
+        final OAuth2AccessTokenProvider tokenProvider = 
propertyContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+        if (tokenProvider == null) {
+            throw new IllegalStateException("AWS Web Identity Token Provider 
is required when AWS Role Source is set to Web Identity Provider");
+        }
+
+        final String roleArn = 
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
+        final String roleSessionName = 
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
+        final Long sessionTimeSeconds = 
propertyContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME).asTimePeriod(TimeUnit.SECONDS);
+        final Integer sessionSeconds = sessionTimeSeconds == null ? null : 
sessionTimeSeconds.intValue();
+        final String stsRegionId = 
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_REGION).getValue();
+        final String stsEndpoint = 
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_ENDPOINT).getValue();
+        final SSLContextProvider sslContextProvider = 
propertyContext.getProperty(AWS_WEB_IDENTITY_SSL_CONTEXT_PROVIDER).asControllerService(SSLContextProvider.class);
+
+        final ApacheHttpClient.Builder httpClientBuilder = 
ApacheHttpClient.builder();
+
+        if (sslContextProvider != null) {
+            final SSLContext sslContext = sslContextProvider.createContext();
+            httpClientBuilder.socketFactory(new 
SSLConnectionSocketFactory(sslContext));
+        }
+
+        final StsClientBuilder stsClientBuilder = 
StsClient.builder().httpClient(httpClientBuilder.build());
+
+        if (!StringUtils.isBlank(stsRegionId)) {
+            stsClientBuilder.region(Region.of(stsRegionId));
+        }
+
+        if (!StringUtils.isBlank(stsEndpoint)) {
+            stsClientBuilder.endpointOverride(URI.create(stsEndpoint));
+        }
+
+        final StsClient stsClient = stsClientBuilder.build();
+
+        return new WebIdentityCredentialsProvider(stsClient, tokenProvider, 
roleArn, roleSessionName, sessionSeconds);
+    }
+
+    private static final class WebIdentityCredentialsProvider implements 
AwsCredentialsProvider, AutoCloseable {
+        private static final Duration SKEW = Duration.ofSeconds(60);
+
+        private final StsClient stsClient;
+        private final OAuth2AccessTokenProvider tokenProvider;
+        private final String roleArn;
+        private final String roleSessionName;
+        private final Integer sessionSeconds;
+
+        private volatile AwsSessionCredentials cachedCredentials;
+        private volatile Instant expiration;
+
+        private WebIdentityCredentialsProvider(final StsClient stsClient,
+                                               final OAuth2AccessTokenProvider 
tokenProvider,
+                                               final String roleArn,
+                                               final String roleSessionName,
+                                               final Integer sessionSeconds) {
+            this.stsClient = Objects.requireNonNull(stsClient, "stsClient 
required");
+            this.tokenProvider = Objects.requireNonNull(tokenProvider, 
"tokenProvider required");
+            this.roleArn = Objects.requireNonNull(roleArn, "roleArn required");
+            this.roleSessionName = Objects.requireNonNull(roleSessionName, 
"roleSessionName required");
+            this.sessionSeconds = sessionSeconds;
+        }
+
+        @Override
+        public AwsCredentials resolveCredentials() {
+            final Instant now = Instant.now();
+            final AwsSessionCredentials currentCredentials = cachedCredentials;
+            final Instant currentExpiration = expiration;
+
+            AwsSessionCredentials resolvedCredentials;
+            if (isCacheValid(now, currentCredentials, currentExpiration)) {
+                resolvedCredentials = currentCredentials;
+            } else {
+                synchronized (this) {
+                    final Instant refreshedNow = Instant.now();
+                    if (isCacheValid(refreshedNow, cachedCredentials, 
expiration)) {
+                        resolvedCredentials = cachedCredentials;
+                    } else {
+                        resolvedCredentials = refreshCredentials();
+                    }
+                }
+            }
+
+            return resolvedCredentials;
+        }
+
+        private String getWebIdentityToken() {
+            final AccessToken accessToken = tokenProvider.getAccessDetails();
+            if (accessToken == null) {
+                throw new IllegalStateException("OAuth2AccessTokenProvider 
returned null AccessToken");
+            }
+
+            final Map<String, Object> additionalParameters = 
accessToken.getAdditionalParameters();
+            String tokenValue = null;
+            if (additionalParameters != null) {
+                final Object idTokenValue = 
additionalParameters.get("id_token");
+                if (idTokenValue instanceof String idToken) {
+                    if (StringUtils.isBlank(idToken)) {
+                        throw new 
IllegalStateException("OAuth2AccessTokenProvider returned an empty id_token");
+                    }
+                    tokenValue = idToken;
+                }
+            }
+
+            if (tokenValue == null) {
+                final String accessTokenValue = accessToken.getAccessToken();
+                if (StringUtils.isBlank(accessTokenValue)) {
+                    throw new IllegalStateException("No usable token found in 
AccessToken (id_token or access_token)");
+                }
+                tokenValue = accessTokenValue;
+            }
+
+            return tokenValue;
+        }
+
+        private boolean isCacheValid(final Instant referenceTime, final 
AwsSessionCredentials credentials, final Instant credentialsExpiration) {
+            return credentials != null
+                    && credentialsExpiration != null
+                    && 
referenceTime.isBefore(credentialsExpiration.minus(SKEW));
+        }
+
+        private AwsSessionCredentials refreshCredentials() {
+            final String webIdentityToken = getWebIdentityToken();
+
+            final AssumeRoleWithWebIdentityRequest.Builder requestBuilder = 
AssumeRoleWithWebIdentityRequest.builder()
+                    .roleArn(roleArn)
+                    .roleSessionName(roleSessionName)
+                    .webIdentityToken(webIdentityToken);
+
+            if (sessionSeconds != null) {
+                requestBuilder.durationSeconds(sessionSeconds);
+            }
+
+            final AssumeRoleWithWebIdentityResponse response = 
stsClient.assumeRoleWithWebIdentity(requestBuilder.build());
+            final Credentials temporaryCredentials = response.credentials();
+            final AwsSessionCredentials sessionCredentials = 
AwsSessionCredentials.create(
+                    temporaryCredentials.accessKeyId(), 
temporaryCredentials.secretAccessKey(), temporaryCredentials.sessionToken());
+
+            cachedCredentials = sessionCredentials;
+            expiration = temporaryCredentials.expiration();
+            return sessionCredentials;
+        }
+
+        @Override
+        public void close() {
+            stsClient.close();
+        }
+    }
+
     @Override
     public void migrateProperties(final PropertyConfiguration config) {
         // For backward compatibility: if an AWS Profile Name was configured 
previously,
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java
new file mode 100644
index 0000000000..691f2e1e9f
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.msk.auth.iam.IAMLoginModule;
+import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Callback handler that supplies AWS credentials sourced from NiFi 
configuration for AWS MSK IAM authentication.
+ */
+public class AmazonMSKCredentialsCallbackHandler implements 
AuthenticateCallbackHandler {
+
+    private AwsCredentialsProvider credentialsProvider;
+
+    @Override
+    public void configure(final Map<String, ?> configs, final String 
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!IAMLoginModule.MECHANISM.equals(saslMechanism)) {
+            throw new IllegalArgumentException("Unexpected SASL mechanism: " + 
saslMechanism);
+        }
+
+        final Object provider = 
configs.get(AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty());
+        if (!(provider instanceof AwsCredentialsProvider)) {
+            throw new IllegalArgumentException("Kafka configuration missing 
AWS Web Identity credentials provider");
+        }
+
+        credentialsProvider = (AwsCredentialsProvider) provider;
+
+        final AppConfigurationEntry loginModuleEntry = 
jaasConfigEntries.stream()
+                .filter(entry -> 
IAMLoginModule.class.getName().equals(entry.getLoginModuleName()))
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("JAAS 
configuration missing IAMLoginModule entry"));
+
+        final Map<String, ?> jaasOptions = loginModuleEntry.getOptions();
+        final Object roleArn = jaasOptions.get("awsRoleArn");
+        final Object roleSessionName = jaasOptions.get("awsRoleSessionName");
+        if (roleArn == null || roleSessionName == null) {
+            throw new IllegalStateException("JAAS configuration missing 
required awsRoleArn or awsRoleSessionName options for Web Identity 
authentication");
+        }
+    }
+
+    @Override
+    public void handle(final Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+        for (final Callback callback : callbacks) {
+            if (callback instanceof AWSCredentialsCallback 
awsCredentialsCallback) {
+                handleCredentialsCallback(awsCredentialsCallback);
+            } else {
+                throw new UnsupportedCallbackException(callback,
+                        "Unsupported callback type 
[%s]".formatted(callback.getClass().getName()));
+            }
+        }
+    }
+
+    private void handleCredentialsCallback(final AWSCredentialsCallback 
callback) {
+        try {
+            final AwsCredentials awsCredentials = 
credentialsProvider.resolveCredentials();
+            callback.setAwsCredentials(awsCredentials);
+        } catch (final Exception e) {
+            callback.setLoadingException(e);
+        }
+    }
+
+    /**
+     * No resources to close because the credentials provider lifecycle is 
managed by the NiFi service.
+     */
+    @Override
+    public void close() {
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
index c3dfc79e8b..e174c95719 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
@@ -16,12 +16,29 @@
  */
 package org.apache.nifi.kafka.service.aws;
 
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.NoOpProcessor;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class AmazonMSKConnectionServiceTest {
 
@@ -47,4 +64,73 @@ class AmazonMSKConnectionServiceTest {
 
         runner.enableControllerService(service);
     }
+
+    @Test
+    void testWebIdentityWithoutCredentialsServiceNotValid() throws 
InitializationException {
+        final AmazonMSKConnectionService service = new 
AmazonMSKConnectionService();
+        runner.addControllerService(SERVICE_ID, service);
+
+        runner.setProperty(service, 
AmazonMSKConnectionService.BOOTSTRAP_SERVERS, BOOTSTRAP_SERVERS);
+        runner.setProperty(service, KafkaClientComponent.AWS_ROLE_SOURCE, 
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+        runner.setProperty(service, KafkaClientComponent.AWS_ASSUME_ROLE_ARN, 
"arn:aws:iam::123456789012:role/TestRole");
+        runner.setProperty(service, 
KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME, "nifi-session");
+
+        runner.assertNotValid(service);
+    }
+
+    @Test
+    void testWebIdentityWithTokenProviderAddsProperty() throws 
InitializationException {
+        final TestAmazonMSKConnectionService service = new 
TestAmazonMSKConnectionService();
+        runner.addControllerService(SERVICE_ID, service);
+
+        final MockOAuth2AccessTokenProvider tokenProvider = new 
MockOAuth2AccessTokenProvider();
+        runner.addControllerService("webIdentityToken", tokenProvider);
+        runner.enableControllerService(tokenProvider);
+
+        runner.setProperty(service, 
AmazonMSKConnectionService.BOOTSTRAP_SERVERS, BOOTSTRAP_SERVERS);
+        runner.setProperty(service, KafkaClientComponent.AWS_ROLE_SOURCE, 
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+        runner.setProperty(service, KafkaClientComponent.AWS_ASSUME_ROLE_ARN, 
"arn:aws:iam::123456789012:role/TestRole");
+        runner.setProperty(service, 
KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME, "nifi-session");
+        runner.setProperty(service, 
AmazonMSKConnectionService.AWS_WEB_IDENTITY_TOKEN_PROVIDER, "webIdentityToken");
+        runner.setProperty(service, 
AmazonMSKConnectionService.AWS_WEB_IDENTITY_STS_REGION, "us-east-1");
+
+        runner.assertValid(service);
+
+        runner.enableControllerService(service);
+
+        final Map<PropertyDescriptor, String> configuredProperties = new 
HashMap<>();
+        configuredProperties.put(AmazonMSKConnectionService.BOOTSTRAP_SERVERS, 
BOOTSTRAP_SERVERS);
+        configuredProperties.put(KafkaClientComponent.AWS_ROLE_SOURCE, 
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+        configuredProperties.put(KafkaClientComponent.AWS_ASSUME_ROLE_ARN, 
"arn:aws:iam::123456789012:role/TestRole");
+        
configuredProperties.put(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME, 
"nifi-session");
+        
configuredProperties.put(AmazonMSKConnectionService.AWS_WEB_IDENTITY_TOKEN_PROVIDER,
 "webIdentityToken");
+        
configuredProperties.put(AmazonMSKConnectionService.AWS_WEB_IDENTITY_STS_REGION,
 "us-east-1");
+
+        final MockConfigurationContext configurationContext = new 
MockConfigurationContext(service, configuredProperties,
+                runner.getProcessContext().getControllerServiceLookup(), 
Collections.emptyMap());
+        configurationContext.setValidateExpressions(false);
+        final Properties properties = 
service.buildConsumerProperties(configurationContext);
+
+        final Object provider = 
properties.get(AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty());
+
+        assertNotNull(provider);
+        assertTrue(provider instanceof AwsCredentialsProvider);
+    }
+
+    private static class MockOAuth2AccessTokenProvider extends 
AbstractControllerService implements OAuth2AccessTokenProvider {
+        @Override
+        public AccessToken getAccessDetails() {
+            final AccessToken accessToken = new AccessToken();
+            accessToken.setAccessToken("mock-access-token");
+            accessToken.setAdditionalParameter("id_token", "mock-id-token");
+            return accessToken;
+        }
+    }
+
+    private static class TestAmazonMSKConnectionService extends 
AmazonMSKConnectionService {
+        Properties buildConsumerProperties(final MockConfigurationContext 
context) {
+            final Properties clientProperties = 
super.getClientProperties(context);
+            return super.getConsumerProperties(context, clientProperties);
+        }
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandlerTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandlerTest.java
new file mode 100644
index 0000000000..63903a5dbb
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandlerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.msk.auth.iam.IAMLoginModule;
+import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class AmazonMSKCredentialsCallbackHandlerTest {
+
+    @Test
+    void testHandleCredentialsCallback() throws IOException, 
UnsupportedCallbackException {
+        final AwsCredentialsProvider credentialsProvider = 
StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKey", 
"secretKey"));
+
+        final AuthenticateCallbackHandler handler = new 
AmazonMSKCredentialsCallbackHandler();
+        final Map<String, Object> configs = Map.of(
+                
AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty(), 
credentialsProvider
+        );
+
+        final Map<String, ?> options = Map.of(
+                "awsRoleArn", "arn:aws:iam::123456789012:role/TestRole",
+                "awsRoleSessionName", "nifi-session"
+        );
+        final AppConfigurationEntry configurationEntry = new 
AppConfigurationEntry(IAMLoginModule.class.getName(), 
LoginModuleControlFlag.REQUIRED, options);
+
+        handler.configure(configs, IAMLoginModule.MECHANISM, 
List.of(configurationEntry));
+
+        final AWSCredentialsCallback callback = new AWSCredentialsCallback();
+        handler.handle(new Callback[]{callback});
+
+        assertNotNull(callback.getAwsCredentials());
+        assertEquals("accessKey", callback.getAwsCredentials().accessKeyId());
+    }
+
+    @Test
+    void testConfigureWithoutCredentialsServiceThrows() {
+        final AuthenticateCallbackHandler handler = new 
AmazonMSKCredentialsCallbackHandler();
+        final Map<String, Object> configs = Map.of();
+        final AppConfigurationEntry configurationEntry = new 
AppConfigurationEntry(IAMLoginModule.class.getName(), 
LoginModuleControlFlag.REQUIRED, Map.of());
+
+        assertThrows(IllegalArgumentException.class, () -> 
handler.configure(configs, IAMLoginModule.MECHANISM, 
List.of(configurationEntry)));
+    }
+
+    @Test
+    void testConfigureWithoutRequiredJaasOptionsThrows() {
+        final AwsCredentialsProvider credentialsProvider = 
StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKey", 
"secretKey"));
+        final AuthenticateCallbackHandler handler = new 
AmazonMSKCredentialsCallbackHandler();
+        final Map<String, Object> configs = Map.of(
+                
AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty(), 
credentialsProvider
+        );
+
+        final Map<String, ?> options = Map.of();
+        final AppConfigurationEntry configurationEntry = new 
AppConfigurationEntry(IAMLoginModule.class.getName(), 
LoginModuleControlFlag.REQUIRED, options);
+
+        assertThrows(IllegalStateException.class, () -> 
handler.configure(configs, IAMLoginModule.MECHANISM, 
List.of(configurationEntry)));
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 8f95f4d03a..3291d6aa1f 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -318,7 +318,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         return results;
     }
 
-    private Properties getProducerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
+    protected Properties getProducerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
         final Properties properties = new Properties();
         properties.putAll(defaultProperties);
 
@@ -332,7 +332,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         return properties;
     }
 
-    private Properties getConsumerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
+    protected Properties getConsumerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
         final Properties properties = new Properties();
         properties.putAll(defaultProperties);
 
@@ -346,7 +346,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         return properties;
     }
 
-    private Properties getClientProperties(final PropertyContext 
propertyContext) {
+    protected Properties getClientProperties(final PropertyContext 
propertyContext) {
         final Properties properties = new Properties();
 
         final String configuredBootstrapServers = 
propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue();
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/aws/AmazonMSKProperty.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/aws/AmazonMSKProperty.java
new file mode 100644
index 0000000000..164188ee27
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/aws/AmazonMSKProperty.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.aws;
+
+/**
+ * AWS MSK-specific Kafka property keys shared between components.
+ */
+public enum AmazonMSKProperty {
+    NIFI_AWS_MSK_CREDENTIALS_PROVIDER("nifi.aws.msk.credentials.provider"),
+    
NIFI_AWS_MSK_CALLBACK_HANDLER_CLASS("org.apache.nifi.kafka.service.aws.AmazonMSKCredentialsCallbackHandler");
+
+    private final String property;
+
+    AmazonMSKProperty(final String property) {
+        this.property = property;
+    }
+
+    public String getProperty() {
+        return property;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index 5d88cf108e..65b3c6765f 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -140,7 +140,8 @@ public interface KafkaClientComponent {
             .required(true)
             .dependsOn(
                     KafkaClientComponent.AWS_ROLE_SOURCE,
-                    AwsRoleSource.SPECIFIED_ROLE
+                    AwsRoleSource.SPECIFIED_ROLE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
             )
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -152,12 +153,24 @@ public interface KafkaClientComponent {
             .required(true)
             .dependsOn(
                     KafkaClientComponent.AWS_ROLE_SOURCE,
-                    AwsRoleSource.SPECIFIED_ROLE
+                    AwsRoleSource.SPECIFIED_ROLE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
             )
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
 
+    PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity Token Provider")
+            .description("Controller Service providing tokens with OAuth2 
OpenID Connect for AWS Web Identity federation.")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .build();
+
     PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
             .description("Service supporting SSL communication with Kafka 
brokers")
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
index a6e02a6d48..404264a698 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
@@ -46,7 +46,7 @@ public class AwsMskIamLoginConfigProvider implements 
LoginConfigProvider {
             }
         }
 
-        if (roleSource == AwsRoleSource.SPECIFIED_ROLE) {
+        if (roleSource == AwsRoleSource.SPECIFIED_ROLE || roleSource == 
AwsRoleSource.WEB_IDENTITY_TOKEN) {
             final String assumeRoleArn = 
context.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
             final String assumeRoleSessionName = 
context.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
 
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
index 669723addd..9aed1268c8 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
@@ -24,7 +24,8 @@ import org.apache.nifi.components.DescribedValue;
 public enum AwsRoleSource implements DescribedValue {
     DEFAULT_PROFILE("Default Profile", "Use the default AWS credentials 
provider chain to locate credentials."),
     SPECIFIED_PROFILE("Specified Profile", "Use the configured AWS Profile 
Name from the default credentials file."),
-    SPECIFIED_ROLE("Specified Role", "Assume a specific AWS Role using the 
configured Role ARN and Session Name.");
+    SPECIFIED_ROLE("Specified Role", "Assume a specific AWS Role using the 
configured Role ARN and Session Name."),
+    WEB_IDENTITY_TOKEN("Web Identity Provider", "Obtain AWS MSK IAM 
credentials using STS AssumeRoleWithWebIdentity and a configured OAuth2/OIDC 
token provider.");
 
     private final String displayName;
     private final String description;
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
index 494629d2a4..f146e26f2e 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
@@ -20,6 +20,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
 import org.apache.nifi.kafka.shared.login.DelegatingLoginConfigProvider;
 import org.apache.nifi.kafka.shared.login.LoginConfigProvider;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
@@ -35,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 
+import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
 import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_MECHANISM;
 import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SECURITY_PROTOCOL;
 import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SSL_CONTEXT_SERVICE;
@@ -89,8 +91,13 @@ public class StandardKafkaPropertyProvider implements 
KafkaPropertyProvider {
             final SaslMechanism saslMechanism = 
context.getProperty(SASL_MECHANISM).asAllowableValue(SaslMechanism.class);
             if (saslMechanism == SaslMechanism.GSSAPI && 
isCustomKerberosLoginFound()) {
                 properties.put(SASL_LOGIN_CLASS.getProperty(), 
SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
-            } else if (saslMechanism == SaslMechanism.AWS_MSK_IAM && 
isAwsMskIamCallbackHandlerFound()) {
-                
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(), 
SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+            } else if (saslMechanism == SaslMechanism.AWS_MSK_IAM) {
+                final PropertyValue tokenProviderProperty = 
context.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER);
+                if (tokenProviderProperty != null && 
tokenProviderProperty.isSet()) {
+                    
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(), 
AmazonMSKProperty.NIFI_AWS_MSK_CALLBACK_HANDLER_CLASS.getProperty());
+                } else if (isAwsMskIamCallbackHandlerFound()) {
+                    
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(), 
SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+                }
             }
         }
     }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
index 6e49970a9f..0a519ac2e4 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
@@ -19,9 +19,13 @@ package org.apache.nifi.kafka.shared.login;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.util.NoOpProcessor;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.reporting.InitializationException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -70,4 +74,37 @@ class AwsMskIamLoginConfigProviderTest {
         
assertTrue(configuration.contains("awsRoleArn=\"arn:aws:iam::123456789012:role/MyRole\""),
 "awsRoleArn JAAS option not present");
         assertTrue(configuration.contains("awsRoleSessionName=\"MySession\""), 
"awsRoleSessionName JAAS option not present");
     }
+
+    @Test
+    void testConfigurationWithWebIdentityProvider() throws 
InitializationException {
+        runner.setProperty(KafkaClientComponent.SASL_MECHANISM, 
SaslMechanism.AWS_MSK_IAM);
+        runner.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE, 
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+
+        runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN, 
"arn:aws:iam::123456789012:role/WebIdentityRole");
+        runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME, 
"WebIdentitySession");
+
+        final MockOAuth2AccessTokenProvider tokenProvider = new 
MockOAuth2AccessTokenProvider();
+        runner.addControllerService("webIdentityToken", tokenProvider);
+        runner.enableControllerService(tokenProvider);
+        
runner.setProperty(KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER, 
"webIdentityToken");
+
+        final PropertyContext context = runner.getProcessContext();
+        final String configuration = provider.getConfiguration(context);
+
+        assertNotNull(configuration);
+        
assertTrue(configuration.contains("awsRoleArn=\"arn:aws:iam::123456789012:role/WebIdentityRole\""),
+                "awsRoleArn JAAS option not present");
+        
assertTrue(configuration.contains("awsRoleSessionName=\"WebIdentitySession\""),
+                "awsRoleSessionName JAAS option not present");
+    }
+
+    private static class MockOAuth2AccessTokenProvider extends 
org.apache.nifi.controller.AbstractControllerService implements 
OAuth2AccessTokenProvider {
+        @Override
+        public AccessToken getAccessDetails() {
+            final AccessToken accessToken = new AccessToken();
+            accessToken.setAccessToken("mock-access-token");
+            accessToken.setAdditionalParameter("id_token", "mock-id-token");
+            return accessToken;
+        }
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
index a3df7c4c0c..bc13001068 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
@@ -17,10 +17,16 @@
 package org.apache.nifi.kafka.shared.property.provider;
 
 import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
 import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
 import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.SecurityProtocol;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.NoOpProcessor;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -77,4 +83,36 @@ class StandardKafkaPropertyProviderTest {
         assertNotNull(saslConfigProperty, "SASL configuration not found");
         assertTrue(saslConfigProperty.toString().contains(SCRAM_LOGIN_MODULE), 
"SCRAM configuration not found");
     }
+
+    @Test
+    void testGetPropertiesAwsMskIamWithCredentialsService() throws 
InitializationException {
+        runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, 
SecurityProtocol.SASL_PLAINTEXT.name());
+        runner.setProperty(KafkaClientComponent.SASL_MECHANISM, 
SaslMechanism.AWS_MSK_IAM);
+        runner.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE, 
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+        runner.setProperty(KafkaClientComponent.BOOTSTRAP_SERVERS, 
"localhost:9092");
+
+        runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN, 
"arn:aws:iam::123456789012:role/TestRole");
+        runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME, 
"TestSession");
+
+        final MockOAuth2AccessTokenProvider tokenProvider = new 
MockOAuth2AccessTokenProvider();
+        runner.addControllerService("webIdentityToken", tokenProvider);
+        runner.enableControllerService(tokenProvider);
+        
runner.setProperty(KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER, 
"webIdentityToken");
+
+        final PropertyContext propertyContext = runner.getProcessContext();
+        final Map<String, Object> properties = 
provider.getProperties(propertyContext);
+
+        final Object callbackHandler = 
properties.get(KafkaClientProperty.SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty());
+        
assertEquals(AmazonMSKProperty.NIFI_AWS_MSK_CALLBACK_HANDLER_CLASS.getProperty(),
 callbackHandler);
+    }
+
+    private static class MockOAuth2AccessTokenProvider extends 
AbstractControllerService implements OAuth2AccessTokenProvider {
+        @Override
+        public AccessToken getAccessDetails() {
+            final AccessToken accessToken = new AccessToken();
+            accessToken.setAccessToken("mock-access-token");
+            accessToken.setAdditionalParameter("id_token", "mock-id-token");
+            return accessToken;
+        }
+    }
 }

Reply via email to