This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f3a0539d94 NIFI-15172 Add support for Web Identity to
AmazonMSKConnectionService (#10489)
f3a0539d94 is described below
commit f3a0539d94735e7c51b64935188a385b1b18e6cb
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Nov 10 21:08:11 2025 +0100
NIFI-15172 Add support for Web Identity to AmazonMSKConnectionService
(#10489)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-kafka-service-aws/pom.xml | 10 +
.../service/aws/AmazonMSKConnectionService.java | 281 ++++++++++++++++++++-
.../aws/AmazonMSKCredentialsCallbackHandler.java | 93 +++++++
.../aws/AmazonMSKConnectionServiceTest.java | 86 +++++++
.../AmazonMSKCredentialsCallbackHandlerTest.java | 88 +++++++
.../kafka/service/Kafka3ConnectionService.java | 6 +-
.../nifi/kafka/shared/aws/AmazonMSKProperty.java | 35 +++
.../shared/component/KafkaClientComponent.java | 17 +-
.../shared/login/AwsMskIamLoginConfigProvider.java | 2 +-
.../nifi/kafka/shared/property/AwsRoleSource.java | 3 +-
.../provider/StandardKafkaPropertyProvider.java | 11 +-
.../login/AwsMskIamLoginConfigProviderTest.java | 37 +++
.../StandardKafkaPropertyProviderTest.java | 38 +++
13 files changed, 697 insertions(+), 10 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
index 2c1e7df587..f4585c257d 100644
--- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
+++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/pom.xml
@@ -61,5 +61,15 @@
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.3.5</version>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sts</artifactId>
+ <version>${software.amazon.awssdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
+ <version>${software.amazon.awssdk.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
index 32236d120a..0f064b4a75 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java
@@ -16,18 +16,48 @@
*/
package org.apache.nifi.kafka.service.aws;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kafka.service.Kafka3ConnectionService;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.AwsRoleSource;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextProvider;
+import org.apache.nifi.util.StringUtils;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityRequest;
+import
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+import javax.net.ssl.SSLContext;
+import java.net.URI;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
@Tags({"AWS", "MSK", "streaming", "kafka"})
@CapabilityDescription("Provides and manages connections to AWS MSK Kafka
Brokers for producer or consumer operations.")
@@ -42,6 +72,57 @@ public class AmazonMSKConnectionService extends
Kafka3ConnectionService {
.defaultValue(SaslMechanism.AWS_MSK_IAM)
.build();
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER =
KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
+
+ private static final int MIN_SESSION_DURATION_SECONDS = 900;
+ private static final int MAX_SESSION_DURATION_SECONDS = 3600;
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_SESSION_TIME = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity Session Time")
+ .description("Session time for AWS STS AssumeRoleWithWebIdentity
(between 900 seconds and 3600 seconds).")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(true)
+ .defaultValue("%d sec".formatted(MAX_SESSION_DURATION_SECONDS))
+ .addValidator(StandardValidators.createTimePeriodValidator(
+ MIN_SESSION_DURATION_SECONDS, TimeUnit.SECONDS,
MAX_SESSION_DURATION_SECONDS, TimeUnit.SECONDS))
+ .build();
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_REGION = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity STS Region")
+ .description("Region identifier used for the AWS Security Token
Service when exchanging Web Identity tokens.")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_ENDPOINT = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity STS Endpoint")
+ .description("Optional endpoint override for the AWS Security
Token Service.")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor
AWS_WEB_IDENTITY_SSL_CONTEXT_PROVIDER = new PropertyDescriptor.Builder()
+ .name("AWS Web Identity SSL Context Provider")
+ .description("SSL Context Service used when communicating with AWS
STS for Web Identity federation.")
+ .identifiesControllerService(SSLContextProvider.class)
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .build();
+
private final List<PropertyDescriptor> supportedPropertyDescriptors;
public AmazonMSKConnectionService() {
@@ -58,6 +139,11 @@ public class AmazonMSKConnectionService extends
Kafka3ConnectionService {
descriptors.add(KafkaClientComponent.AWS_PROFILE_NAME);
descriptors.add(KafkaClientComponent.AWS_ASSUME_ROLE_ARN);
descriptors.add(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME);
+ descriptors.add(AWS_WEB_IDENTITY_TOKEN_PROVIDER);
+ descriptors.add(AWS_WEB_IDENTITY_SESSION_TIME);
+ descriptors.add(AWS_WEB_IDENTITY_STS_REGION);
+ descriptors.add(AWS_WEB_IDENTITY_STS_ENDPOINT);
+ descriptors.add(AWS_WEB_IDENTITY_SSL_CONTEXT_PROVIDER);
}
}
@@ -69,6 +155,199 @@ public class AmazonMSKConnectionService extends
Kafka3ConnectionService {
return supportedPropertyDescriptors;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final Collection<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final AwsRoleSource roleSource =
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+ if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+ if
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+ results.add(new ValidationResult.Builder()
+
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+ .valid(false)
+ .explanation("AWS Web Identity Token Provider must be
configured when AWS Role Source is set to Web Identity Provider")
+ .build());
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ protected Properties getProducerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
+ final Properties properties =
super.getProducerProperties(propertyContext, defaultProperties);
+ setAuthenticationProperties(properties, propertyContext);
+ return properties;
+ }
+
+ @Override
+ protected Properties getConsumerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
+ final Properties properties =
super.getConsumerProperties(propertyContext, defaultProperties);
+ setAuthenticationProperties(properties, propertyContext);
+ return properties;
+ }
+
+ @Override
+ protected Properties getClientProperties(final PropertyContext
propertyContext) {
+ final Properties properties =
super.getClientProperties(propertyContext);
+ setAuthenticationProperties(properties, propertyContext);
+ return properties;
+ }
+
+ private void setAuthenticationProperties(final Properties properties,
final PropertyContext propertyContext) {
+ final AwsRoleSource roleSource =
propertyContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+ if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+ final AwsCredentialsProvider credentialsProvider =
createWebIdentityCredentialsProvider(propertyContext);
+
properties.put(AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty(),
credentialsProvider);
+ }
+ }
+
+ private AwsCredentialsProvider createWebIdentityCredentialsProvider(final
PropertyContext propertyContext) {
+ final OAuth2AccessTokenProvider tokenProvider =
propertyContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ if (tokenProvider == null) {
+ throw new IllegalStateException("AWS Web Identity Token Provider
is required when AWS Role Source is set to Web Identity Provider");
+ }
+
+ final String roleArn =
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
+ final String roleSessionName =
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
+ final Long sessionTimeSeconds =
propertyContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME).asTimePeriod(TimeUnit.SECONDS);
+ final Integer sessionSeconds = sessionTimeSeconds == null ? null :
sessionTimeSeconds.intValue();
+ final String stsRegionId =
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_REGION).getValue();
+ final String stsEndpoint =
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_ENDPOINT).getValue();
+ final SSLContextProvider sslContextProvider =
propertyContext.getProperty(AWS_WEB_IDENTITY_SSL_CONTEXT_PROVIDER).asControllerService(SSLContextProvider.class);
+
+ final ApacheHttpClient.Builder httpClientBuilder =
ApacheHttpClient.builder();
+
+ if (sslContextProvider != null) {
+ final SSLContext sslContext = sslContextProvider.createContext();
+ httpClientBuilder.socketFactory(new
SSLConnectionSocketFactory(sslContext));
+ }
+
+ final StsClientBuilder stsClientBuilder =
StsClient.builder().httpClient(httpClientBuilder.build());
+
+ if (!StringUtils.isBlank(stsRegionId)) {
+ stsClientBuilder.region(Region.of(stsRegionId));
+ }
+
+ if (!StringUtils.isBlank(stsEndpoint)) {
+ stsClientBuilder.endpointOverride(URI.create(stsEndpoint));
+ }
+
+ final StsClient stsClient = stsClientBuilder.build();
+
+ return new WebIdentityCredentialsProvider(stsClient, tokenProvider,
roleArn, roleSessionName, sessionSeconds);
+ }
+
+ private static final class WebIdentityCredentialsProvider implements
AwsCredentialsProvider, AutoCloseable {
+ private static final Duration SKEW = Duration.ofSeconds(60);
+
+ private final StsClient stsClient;
+ private final OAuth2AccessTokenProvider tokenProvider;
+ private final String roleArn;
+ private final String roleSessionName;
+ private final Integer sessionSeconds;
+
+ private volatile AwsSessionCredentials cachedCredentials;
+ private volatile Instant expiration;
+
+ private WebIdentityCredentialsProvider(final StsClient stsClient,
+ final OAuth2AccessTokenProvider
tokenProvider,
+ final String roleArn,
+ final String roleSessionName,
+ final Integer sessionSeconds) {
+ this.stsClient = Objects.requireNonNull(stsClient, "stsClient
required");
+ this.tokenProvider = Objects.requireNonNull(tokenProvider,
"tokenProvider required");
+ this.roleArn = Objects.requireNonNull(roleArn, "roleArn required");
+ this.roleSessionName = Objects.requireNonNull(roleSessionName,
"roleSessionName required");
+ this.sessionSeconds = sessionSeconds;
+ }
+
+ @Override
+ public AwsCredentials resolveCredentials() {
+ final Instant now = Instant.now();
+ final AwsSessionCredentials currentCredentials = cachedCredentials;
+ final Instant currentExpiration = expiration;
+
+ AwsSessionCredentials resolvedCredentials;
+ if (isCacheValid(now, currentCredentials, currentExpiration)) {
+ resolvedCredentials = currentCredentials;
+ } else {
+ synchronized (this) {
+ final Instant refreshedNow = Instant.now();
+ if (isCacheValid(refreshedNow, cachedCredentials,
expiration)) {
+ resolvedCredentials = cachedCredentials;
+ } else {
+ resolvedCredentials = refreshCredentials();
+ }
+ }
+ }
+
+ return resolvedCredentials;
+ }
+
+ private String getWebIdentityToken() {
+ final AccessToken accessToken = tokenProvider.getAccessDetails();
+ if (accessToken == null) {
+ throw new IllegalStateException("OAuth2AccessTokenProvider
returned null AccessToken");
+ }
+
+ final Map<String, Object> additionalParameters =
accessToken.getAdditionalParameters();
+ String tokenValue = null;
+ if (additionalParameters != null) {
+ final Object idTokenValue =
additionalParameters.get("id_token");
+ if (idTokenValue instanceof String idToken) {
+ if (StringUtils.isBlank(idToken)) {
+ throw new
IllegalStateException("OAuth2AccessTokenProvider returned an empty id_token");
+ }
+ tokenValue = idToken;
+ }
+ }
+
+ if (tokenValue == null) {
+ final String accessTokenValue = accessToken.getAccessToken();
+ if (StringUtils.isBlank(accessTokenValue)) {
+ throw new IllegalStateException("No usable token found in
AccessToken (id_token or access_token)");
+ }
+ tokenValue = accessTokenValue;
+ }
+
+ return tokenValue;
+ }
+
+ private boolean isCacheValid(final Instant referenceTime, final
AwsSessionCredentials credentials, final Instant credentialsExpiration) {
+ return credentials != null
+ && credentialsExpiration != null
+ &&
referenceTime.isBefore(credentialsExpiration.minus(SKEW));
+ }
+
+ private AwsSessionCredentials refreshCredentials() {
+ final String webIdentityToken = getWebIdentityToken();
+
+ final AssumeRoleWithWebIdentityRequest.Builder requestBuilder =
AssumeRoleWithWebIdentityRequest.builder()
+ .roleArn(roleArn)
+ .roleSessionName(roleSessionName)
+ .webIdentityToken(webIdentityToken);
+
+ if (sessionSeconds != null) {
+ requestBuilder.durationSeconds(sessionSeconds);
+ }
+
+ final AssumeRoleWithWebIdentityResponse response =
stsClient.assumeRoleWithWebIdentity(requestBuilder.build());
+ final Credentials temporaryCredentials = response.credentials();
+ final AwsSessionCredentials sessionCredentials =
AwsSessionCredentials.create(
+ temporaryCredentials.accessKeyId(),
temporaryCredentials.secretAccessKey(), temporaryCredentials.sessionToken());
+
+ cachedCredentials = sessionCredentials;
+ expiration = temporaryCredentials.expiration();
+ return sessionCredentials;
+ }
+
+ @Override
+ public void close() {
+ stsClient.close();
+ }
+ }
+
@Override
public void migrateProperties(final PropertyConfiguration config) {
// For backward compatibility: if an AWS Profile Name was configured
previously,
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java
new file mode 100644
index 0000000000..691f2e1e9f
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.msk.auth.iam.IAMLoginModule;
+import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Callback handler that supplies AWS credentials sourced from NiFi
configuration for AWS MSK IAM authentication.
+ */
+public class AmazonMSKCredentialsCallbackHandler implements
AuthenticateCallbackHandler {
+
+ private AwsCredentialsProvider credentialsProvider;
+
+ @Override
+ public void configure(final Map<String, ?> configs, final String
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
+ if (!IAMLoginModule.MECHANISM.equals(saslMechanism)) {
+ throw new IllegalArgumentException("Unexpected SASL mechanism: " +
saslMechanism);
+ }
+
+ final Object provider =
configs.get(AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty());
+ if (!(provider instanceof AwsCredentialsProvider)) {
+ throw new IllegalArgumentException("Kafka configuration missing
AWS Web Identity credentials provider");
+ }
+
+ credentialsProvider = (AwsCredentialsProvider) provider;
+
+ final AppConfigurationEntry loginModuleEntry =
jaasConfigEntries.stream()
+ .filter(entry ->
IAMLoginModule.class.getName().equals(entry.getLoginModuleName()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("JAAS
configuration missing IAMLoginModule entry"));
+
+ final Map<String, ?> jaasOptions = loginModuleEntry.getOptions();
+ final Object roleArn = jaasOptions.get("awsRoleArn");
+ final Object roleSessionName = jaasOptions.get("awsRoleSessionName");
+ if (roleArn == null || roleSessionName == null) {
+ throw new IllegalStateException("JAAS configuration missing
required awsRoleArn or awsRoleSessionName options for Web Identity
authentication");
+ }
+ }
+
+ @Override
+ public void handle(final Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
+ for (final Callback callback : callbacks) {
+ if (callback instanceof AWSCredentialsCallback
awsCredentialsCallback) {
+ handleCredentialsCallback(awsCredentialsCallback);
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unsupported callback type
[%s]".formatted(callback.getClass().getName()));
+ }
+ }
+ }
+
+ private void handleCredentialsCallback(final AWSCredentialsCallback
callback) {
+ try {
+ final AwsCredentials awsCredentials =
credentialsProvider.resolveCredentials();
+ callback.setAwsCredentials(awsCredentials);
+ } catch (final Exception e) {
+ callback.setLoadingException(e);
+ }
+ }
+
+ /**
+ * No resources to close because the credentials provider lifecycle is
managed by the NiFi service.
+ */
+ @Override
+ public void close() {
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
index c3dfc79e8b..e174c95719 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionServiceTest.java
@@ -16,12 +16,29 @@
*/
package org.apache.nifi.kafka.service.aws;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class AmazonMSKConnectionServiceTest {
@@ -47,4 +64,73 @@ class AmazonMSKConnectionServiceTest {
runner.enableControllerService(service);
}
+
+ @Test
+ void testWebIdentityWithoutCredentialsServiceNotValid() throws
InitializationException {
+ final AmazonMSKConnectionService service = new
AmazonMSKConnectionService();
+ runner.addControllerService(SERVICE_ID, service);
+
+ runner.setProperty(service,
AmazonMSKConnectionService.BOOTSTRAP_SERVERS, BOOTSTRAP_SERVERS);
+ runner.setProperty(service, KafkaClientComponent.AWS_ROLE_SOURCE,
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+ runner.setProperty(service, KafkaClientComponent.AWS_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/TestRole");
+ runner.setProperty(service,
KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME, "nifi-session");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ void testWebIdentityWithTokenProviderAddsProperty() throws
InitializationException {
+ final TestAmazonMSKConnectionService service = new
TestAmazonMSKConnectionService();
+ runner.addControllerService(SERVICE_ID, service);
+
+ final MockOAuth2AccessTokenProvider tokenProvider = new
MockOAuth2AccessTokenProvider();
+ runner.addControllerService("webIdentityToken", tokenProvider);
+ runner.enableControllerService(tokenProvider);
+
+ runner.setProperty(service,
AmazonMSKConnectionService.BOOTSTRAP_SERVERS, BOOTSTRAP_SERVERS);
+ runner.setProperty(service, KafkaClientComponent.AWS_ROLE_SOURCE,
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+ runner.setProperty(service, KafkaClientComponent.AWS_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/TestRole");
+ runner.setProperty(service,
KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME, "nifi-session");
+ runner.setProperty(service,
AmazonMSKConnectionService.AWS_WEB_IDENTITY_TOKEN_PROVIDER, "webIdentityToken");
+ runner.setProperty(service,
AmazonMSKConnectionService.AWS_WEB_IDENTITY_STS_REGION, "us-east-1");
+
+ runner.assertValid(service);
+
+ runner.enableControllerService(service);
+
+ final Map<PropertyDescriptor, String> configuredProperties = new
HashMap<>();
+ configuredProperties.put(AmazonMSKConnectionService.BOOTSTRAP_SERVERS,
BOOTSTRAP_SERVERS);
+ configuredProperties.put(KafkaClientComponent.AWS_ROLE_SOURCE,
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+ configuredProperties.put(KafkaClientComponent.AWS_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/TestRole");
+
configuredProperties.put(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME,
"nifi-session");
+
configuredProperties.put(AmazonMSKConnectionService.AWS_WEB_IDENTITY_TOKEN_PROVIDER,
"webIdentityToken");
+
configuredProperties.put(AmazonMSKConnectionService.AWS_WEB_IDENTITY_STS_REGION,
"us-east-1");
+
+ final MockConfigurationContext configurationContext = new
MockConfigurationContext(service, configuredProperties,
+ runner.getProcessContext().getControllerServiceLookup(),
Collections.emptyMap());
+ configurationContext.setValidateExpressions(false);
+ final Properties properties =
service.buildConsumerProperties(configurationContext);
+
+ final Object provider =
properties.get(AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty());
+
+ assertNotNull(provider);
+ assertTrue(provider instanceof AwsCredentialsProvider);
+ }
+
+ private static class MockOAuth2AccessTokenProvider extends
AbstractControllerService implements OAuth2AccessTokenProvider {
+ @Override
+ public AccessToken getAccessDetails() {
+ final AccessToken accessToken = new AccessToken();
+ accessToken.setAccessToken("mock-access-token");
+ accessToken.setAdditionalParameter("id_token", "mock-id-token");
+ return accessToken;
+ }
+ }
+
+ private static class TestAmazonMSKConnectionService extends
AmazonMSKConnectionService {
+ Properties buildConsumerProperties(final MockConfigurationContext
context) {
+ final Properties clientProperties =
super.getClientProperties(context);
+ return super.getConsumerProperties(context, clientProperties);
+ }
+ }
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandlerTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandlerTest.java
new file mode 100644
index 0000000000..63903a5dbb
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/test/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandlerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.msk.auth.iam.IAMLoginModule;
+import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class AmazonMSKCredentialsCallbackHandlerTest {
+
+ @Test
+ void testHandleCredentialsCallback() throws IOException,
UnsupportedCallbackException {
+ final AwsCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKey",
"secretKey"));
+
+ final AuthenticateCallbackHandler handler = new
AmazonMSKCredentialsCallbackHandler();
+ final Map<String, Object> configs = Map.of(
+
AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty(),
credentialsProvider
+ );
+
+ final Map<String, ?> options = Map.of(
+ "awsRoleArn", "arn:aws:iam::123456789012:role/TestRole",
+ "awsRoleSessionName", "nifi-session"
+ );
+ final AppConfigurationEntry configurationEntry = new
AppConfigurationEntry(IAMLoginModule.class.getName(),
LoginModuleControlFlag.REQUIRED, options);
+
+ handler.configure(configs, IAMLoginModule.MECHANISM,
List.of(configurationEntry));
+
+ final AWSCredentialsCallback callback = new AWSCredentialsCallback();
+ handler.handle(new Callback[]{callback});
+
+ assertNotNull(callback.getAwsCredentials());
+ assertEquals("accessKey", callback.getAwsCredentials().accessKeyId());
+ }
+
+ @Test
+ void testConfigureWithoutCredentialsServiceThrows() {
+ final AuthenticateCallbackHandler handler = new
AmazonMSKCredentialsCallbackHandler();
+ final Map<String, Object> configs = Map.of();
+ final AppConfigurationEntry configurationEntry = new
AppConfigurationEntry(IAMLoginModule.class.getName(),
LoginModuleControlFlag.REQUIRED, Map.of());
+
+ assertThrows(IllegalArgumentException.class, () ->
handler.configure(configs, IAMLoginModule.MECHANISM,
List.of(configurationEntry)));
+ }
+
+ @Test
+ void testConfigureWithoutRequiredJaasOptionsThrows() {
+ final AwsCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKey",
"secretKey"));
+ final AuthenticateCallbackHandler handler = new
AmazonMSKCredentialsCallbackHandler();
+ final Map<String, Object> configs = Map.of(
+
AmazonMSKProperty.NIFI_AWS_MSK_CREDENTIALS_PROVIDER.getProperty(),
credentialsProvider
+ );
+
+ final Map<String, ?> options = Map.of();
+ final AppConfigurationEntry configurationEntry = new
AppConfigurationEntry(IAMLoginModule.class.getName(),
LoginModuleControlFlag.REQUIRED, options);
+
+ assertThrows(IllegalStateException.class, () ->
handler.configure(configs, IAMLoginModule.MECHANISM,
List.of(configurationEntry)));
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 8f95f4d03a..3291d6aa1f 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -318,7 +318,7 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
return results;
}
- private Properties getProducerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
+ protected Properties getProducerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
final Properties properties = new Properties();
properties.putAll(defaultProperties);
@@ -332,7 +332,7 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
return properties;
}
- private Properties getConsumerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
+ protected Properties getConsumerProperties(final PropertyContext
propertyContext, final Properties defaultProperties) {
final Properties properties = new Properties();
properties.putAll(defaultProperties);
@@ -346,7 +346,7 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
return properties;
}
- private Properties getClientProperties(final PropertyContext
propertyContext) {
+ protected Properties getClientProperties(final PropertyContext
propertyContext) {
final Properties properties = new Properties();
final String configuredBootstrapServers =
propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue();
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/aws/AmazonMSKProperty.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/aws/AmazonMSKProperty.java
new file mode 100644
index 0000000000..164188ee27
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/aws/AmazonMSKProperty.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.aws;
+
+/**
+ * AWS MSK-specific Kafka property keys shared between components.
+ */
+public enum AmazonMSKProperty {
+ NIFI_AWS_MSK_CREDENTIALS_PROVIDER("nifi.aws.msk.credentials.provider"),
+
NIFI_AWS_MSK_CALLBACK_HANDLER_CLASS("org.apache.nifi.kafka.service.aws.AmazonMSKCredentialsCallbackHandler");
+
+ private final String property;
+
+ AmazonMSKProperty(final String property) {
+ this.property = property;
+ }
+
+ public String getProperty() {
+ return property;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index 5d88cf108e..65b3c6765f 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -140,7 +140,8 @@ public interface KafkaClientComponent {
.required(true)
.dependsOn(
KafkaClientComponent.AWS_ROLE_SOURCE,
- AwsRoleSource.SPECIFIED_ROLE
+ AwsRoleSource.SPECIFIED_ROLE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -152,12 +153,24 @@ public interface KafkaClientComponent {
.required(true)
.dependsOn(
KafkaClientComponent.AWS_ROLE_SOURCE,
- AwsRoleSource.SPECIFIED_ROLE
+ AwsRoleSource.SPECIFIED_ROLE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
+ PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity Token Provider")
+ .description("Controller Service providing tokens with OAuth2
OpenID Connect for AWS Web Identity federation.")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .required(true)
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .build();
+
PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Service supporting SSL communication with Kafka
brokers")
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
index a6e02a6d48..404264a698 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
@@ -46,7 +46,7 @@ public class AwsMskIamLoginConfigProvider implements
LoginConfigProvider {
}
}
- if (roleSource == AwsRoleSource.SPECIFIED_ROLE) {
+ if (roleSource == AwsRoleSource.SPECIFIED_ROLE || roleSource ==
AwsRoleSource.WEB_IDENTITY_TOKEN) {
final String assumeRoleArn =
context.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
final String assumeRoleSessionName =
context.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
index 669723addd..9aed1268c8 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/AwsRoleSource.java
@@ -24,7 +24,8 @@ import org.apache.nifi.components.DescribedValue;
public enum AwsRoleSource implements DescribedValue {
DEFAULT_PROFILE("Default Profile", "Use the default AWS credentials
provider chain to locate credentials."),
SPECIFIED_PROFILE("Specified Profile", "Use the configured AWS Profile
Name from the default credentials file."),
- SPECIFIED_ROLE("Specified Role", "Assume a specific AWS Role using the
configured Role ARN and Session Name.");
+ SPECIFIED_ROLE("Specified Role", "Assume a specific AWS Role using the
configured Role ARN and Session Name."),
+ WEB_IDENTITY_TOKEN("Web Identity Provider", "Obtain AWS MSK IAM
credentials using STS AssumeRoleWithWebIdentity and a configured OAuth2/OIDC
token provider.");
private final String displayName;
private final String description;
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
index 494629d2a4..f146e26f2e 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
@@ -20,6 +20,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
import org.apache.nifi.kafka.shared.login.DelegatingLoginConfigProvider;
import org.apache.nifi.kafka.shared.login.LoginConfigProvider;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
@@ -35,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
+import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_MECHANISM;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SECURITY_PROTOCOL;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SSL_CONTEXT_SERVICE;
@@ -89,8 +91,13 @@ public class StandardKafkaPropertyProvider implements
KafkaPropertyProvider {
final SaslMechanism saslMechanism =
context.getProperty(SASL_MECHANISM).asAllowableValue(SaslMechanism.class);
if (saslMechanism == SaslMechanism.GSSAPI &&
isCustomKerberosLoginFound()) {
properties.put(SASL_LOGIN_CLASS.getProperty(),
SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
- } else if (saslMechanism == SaslMechanism.AWS_MSK_IAM &&
isAwsMskIamCallbackHandlerFound()) {
-
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(),
SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+ } else if (saslMechanism == SaslMechanism.AWS_MSK_IAM) {
+ final PropertyValue tokenProviderProperty =
context.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER);
+ if (tokenProviderProperty != null &&
tokenProviderProperty.isSet()) {
+
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(),
AmazonMSKProperty.NIFI_AWS_MSK_CALLBACK_HANDLER_CLASS.getProperty());
+ } else if (isAwsMskIamCallbackHandlerFound()) {
+
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(),
SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+ }
}
}
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
index 6e49970a9f..0a519ac2e4 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProviderTest.java
@@ -19,9 +19,13 @@ package org.apache.nifi.kafka.shared.login;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.reporting.InitializationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -70,4 +74,37 @@ class AwsMskIamLoginConfigProviderTest {
assertTrue(configuration.contains("awsRoleArn=\"arn:aws:iam::123456789012:role/MyRole\""),
"awsRoleArn JAAS option not present");
assertTrue(configuration.contains("awsRoleSessionName=\"MySession\""),
"awsRoleSessionName JAAS option not present");
}
+
+ @Test
+ void testConfigurationWithWebIdentityProvider() throws
InitializationException {
+ runner.setProperty(KafkaClientComponent.SASL_MECHANISM,
SaslMechanism.AWS_MSK_IAM);
+ runner.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE,
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+
+ runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/WebIdentityRole");
+ runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME,
"WebIdentitySession");
+
+ final MockOAuth2AccessTokenProvider tokenProvider = new
MockOAuth2AccessTokenProvider();
+ runner.addControllerService("webIdentityToken", tokenProvider);
+ runner.enableControllerService(tokenProvider);
+
runner.setProperty(KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER,
"webIdentityToken");
+
+ final PropertyContext context = runner.getProcessContext();
+ final String configuration = provider.getConfiguration(context);
+
+ assertNotNull(configuration);
+
assertTrue(configuration.contains("awsRoleArn=\"arn:aws:iam::123456789012:role/WebIdentityRole\""),
+ "awsRoleArn JAAS option not present");
+
assertTrue(configuration.contains("awsRoleSessionName=\"WebIdentitySession\""),
+ "awsRoleSessionName JAAS option not present");
+ }
+
+ private static class MockOAuth2AccessTokenProvider extends
org.apache.nifi.controller.AbstractControllerService implements
OAuth2AccessTokenProvider {
+ @Override
+ public AccessToken getAccessDetails() {
+ final AccessToken accessToken = new AccessToken();
+ accessToken.setAccessToken("mock-access-token");
+ accessToken.setAdditionalParameter("id_token", "mock-id-token");
+ return accessToken;
+ }
+ }
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
index a3df7c4c0c..bc13001068 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
@@ -17,10 +17,16 @@
package org.apache.nifi.kafka.shared.property.provider;
import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKProperty;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.AwsRoleSource;
import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -77,4 +83,36 @@ class StandardKafkaPropertyProviderTest {
assertNotNull(saslConfigProperty, "SASL configuration not found");
assertTrue(saslConfigProperty.toString().contains(SCRAM_LOGIN_MODULE),
"SCRAM configuration not found");
}
+
+ @Test
+ void testGetPropertiesAwsMskIamWithCredentialsService() throws
InitializationException {
+ runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL,
SecurityProtocol.SASL_PLAINTEXT.name());
+ runner.setProperty(KafkaClientComponent.SASL_MECHANISM,
SaslMechanism.AWS_MSK_IAM);
+ runner.setProperty(KafkaClientComponent.AWS_ROLE_SOURCE,
AwsRoleSource.WEB_IDENTITY_TOKEN.name());
+ runner.setProperty(KafkaClientComponent.BOOTSTRAP_SERVERS,
"localhost:9092");
+
+ runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/TestRole");
+ runner.setProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME,
"TestSession");
+
+ final MockOAuth2AccessTokenProvider tokenProvider = new
MockOAuth2AccessTokenProvider();
+ runner.addControllerService("webIdentityToken", tokenProvider);
+ runner.enableControllerService(tokenProvider);
+
runner.setProperty(KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER,
"webIdentityToken");
+
+ final PropertyContext propertyContext = runner.getProcessContext();
+ final Map<String, Object> properties =
provider.getProperties(propertyContext);
+
+ final Object callbackHandler =
properties.get(KafkaClientProperty.SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty());
+
assertEquals(AmazonMSKProperty.NIFI_AWS_MSK_CALLBACK_HANDLER_CLASS.getProperty(),
callbackHandler);
+ }
+
+ private static class MockOAuth2AccessTokenProvider extends
AbstractControllerService implements OAuth2AccessTokenProvider {
+ @Override
+ public AccessToken getAccessDetails() {
+ final AccessToken accessToken = new AccessToken();
+ accessToken.setAccessToken("mock-access-token");
+ accessToken.setAdditionalParameter("id_token", "mock-id-token");
+ return accessToken;
+ }
+ }
}