exceptionfactory commented on code in PR #10489:
URL: https://github.com/apache/nifi/pull/10489#discussion_r2506960226


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java:
##########
@@ -152,12 +153,24 @@ public interface KafkaClientComponent {
             .required(true)
             .dependsOn(
                     KafkaClientComponent.AWS_ROLE_SOURCE,
-                    AwsRoleSource.SPECIFIED_ROLE
+                    AwsRoleSource.SPECIFIED_ROLE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
             )
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
 
+    PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity Token Provider")
+            .description("Controller Service providing OAuth2/OIDC tokens for 
AWS Web Identity federation.")

Review Comment:
   ```suggestion
               .description("Controller Service providing tokens with OAuth2 
OpenID Connect for AWS Web Identity federation.")
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java:
##########
@@ -400,6 +406,17 @@ private void setSslProperties(final Properties properties, 
final PropertyContext
         }
     }
 
+    /**
+     * Hook for subclasses to customize Kafka client, producer, or consumer 
properties after the standard
+     * configuration has been applied.
+     *
+     * @param properties Kafka client properties to update
+     * @param propertyContext NiFi property context with component 
configuration
+     */
+    protected void customizeKafkaProperties(final Properties properties, final 
PropertyContext propertyContext) {

Review Comment:
   Rather than implementing a callback approach for extension, with ambiguous 
reference to the particular type of property, I recommend changing the 
visibility on the `get...Properties()` methods to `protected`. That would allow 
subclasses to call the super method, and then make adjustments, while 
preserving the type of properties requested.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/aws/AmazonMSKKafkaProperties.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.aws;
+
+/**
+ * AWS MSK-specific Kafka property keys shared between components.
+ */
+public final class AmazonMSKKafkaProperties {

Review Comment:
   `Kafka` is redundant in the name. Rather than using `public static final` in 
a class, I recommend changing this to an enum named `AmazonMSKProperty` and 
having a `getProperty()` public method that returns the values.
   
   ```suggestion
   public enum AmazonMSKProperty {
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -69,6 +155,172 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
         return supportedPropertyDescriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final AwsRoleSource roleSource = 
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+        if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+            if 
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+                results.add(new ValidationResult.Builder()
+                        
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+                        .valid(false)
+                        .explanation("AWS Web Identity Token Provider must be 
configured when AWS Role Source is set to Web Identity Provider")
+                        .build());
+            }
+
+            final PropertyValue sessionTimeProperty = 
validationContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME);
+            if (sessionTimeProperty != null && sessionTimeProperty.isSet()) {
+                final Integer sessionSeconds = sessionTimeProperty.asInteger();
+                if (sessionSeconds == null || sessionSeconds < 
MIN_SESSION_DURATION_SECONDS || sessionSeconds > MAX_SESSION_DURATION_SECONDS) {
+                    results.add(new ValidationResult.Builder()
+                            
.subject(AWS_WEB_IDENTITY_SESSION_TIME.getDisplayName())
+                            .valid(false)
+                            .explanation(String.format("Session time must be 
between %d and %d seconds", MIN_SESSION_DURATION_SECONDS, 
MAX_SESSION_DURATION_SECONDS))
+                            .build());
+                }
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    protected void customizeKafkaProperties(final Properties properties, final 
PropertyContext propertyContext) {
+        final AwsRoleSource roleSource = 
propertyContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+        if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+            final AwsCredentialsProvider credentialsProvider = 
createWebIdentityCredentialsProvider(propertyContext);
+            
properties.put(AmazonMSKKafkaProperties.NIFI_AWS_MSK_CREDENTIALS_PROVIDER, 
credentialsProvider);
+        }
+    }
+
+    private AwsCredentialsProvider createWebIdentityCredentialsProvider(final 
PropertyContext propertyContext) {
+        final OAuth2AccessTokenProvider tokenProvider = 
propertyContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+        if (tokenProvider == null) {
+            throw new IllegalStateException("AWS Web Identity Token Provider 
is required when AWS Role Source is set to Web Identity Provider");
+        }
+
+        final String roleArn = 
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
+        final String roleSessionName = 
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
+        final Integer sessionSeconds = 
propertyContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME).asInteger();
+        final String stsRegionId = 
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_REGION).getValue();
+        final String stsEndpoint = 
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_ENDPOINT).getValue();
+        final SSLContextProvider sslContextProvider = 
propertyContext.getProperty(AWS_WEB_IDENTITY_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
+
+        final ApacheHttpClient.Builder httpClientBuilder = 
ApacheHttpClient.builder();
+
+        if (sslContextProvider != null) {
+            final SSLContext sslContext = sslContextProvider.createContext();
+            httpClientBuilder.socketFactory(new 
SSLConnectionSocketFactory(sslContext));
+        }
+
+        final StsClientBuilder stsClientBuilder = 
StsClient.builder().httpClient(httpClientBuilder.build());
+
+        if (!StringUtils.isBlank(stsRegionId)) {
+            stsClientBuilder.region(Region.of(stsRegionId));
+        }
+
+        if (!StringUtils.isBlank(stsEndpoint)) {
+            stsClientBuilder.endpointOverride(URI.create(stsEndpoint));
+        }
+
+        final StsClient stsClient = stsClientBuilder.build();
+
+        return new WebIdentityCredentialsProvider(stsClient, tokenProvider, 
roleArn, roleSessionName, sessionSeconds);
+    }
+
+    private static final class WebIdentityCredentialsProvider implements 
AwsCredentialsProvider, AutoCloseable {
+        private static final Duration SKEW = Duration.ofSeconds(60);
+
+        private final StsClient stsClient;
+        private final OAuth2AccessTokenProvider tokenProvider;
+        private final String roleArn;
+        private final String roleSessionName;
+        private final Integer sessionSeconds;
+
+        private volatile AwsSessionCredentials cachedCredentials;
+        private volatile Instant expiration;
+
+        private WebIdentityCredentialsProvider(final StsClient stsClient,
+                                               final OAuth2AccessTokenProvider 
tokenProvider,
+                                               final String roleArn,
+                                               final String roleSessionName,
+                                               final Integer sessionSeconds) {
+            this.stsClient = Objects.requireNonNull(stsClient, "stsClient 
required");
+            this.tokenProvider = Objects.requireNonNull(tokenProvider, 
"tokenProvider required");
+            this.roleArn = Objects.requireNonNull(roleArn, "roleArn required");
+            this.roleSessionName = Objects.requireNonNull(roleSessionName, 
"roleSessionName required");
+            this.sessionSeconds = sessionSeconds;
+        }
+
+        @Override
+        public AwsCredentials resolveCredentials() {
+            final Instant now = Instant.now();
+            final AwsSessionCredentials currentCredentials = cachedCredentials;
+            final Instant currentExpiration = expiration;
+
+            if (currentCredentials != null && currentExpiration != null && 
now.isBefore(currentExpiration.minus(SKEW))) {
+                return currentCredentials;

Review Comment:
   Recommend refactoring to a single return. It may also be useful to add 
another private method for evaluating the expiration that encapsulates null 
checking.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -69,6 +155,172 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
         return supportedPropertyDescriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final AwsRoleSource roleSource = 
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+        if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+            if 
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+                results.add(new ValidationResult.Builder()
+                        
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+                        .valid(false)
+                        .explanation("AWS Web Identity Token Provider must be 
configured when AWS Role Source is set to Web Identity Provider")
+                        .build());
+            }
+
+            final PropertyValue sessionTimeProperty = 
validationContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME);

Review Comment:
   Can the `StandardValidators.createTimePeriodValidator()` be used to set 
minimum and maximum values?



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -42,6 +73,56 @@ public class AmazonMSKConnectionService extends 
Kafka3ConnectionService {
             .defaultValue(SaslMechanism.AWS_MSK_IAM)
             .build();
 
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER = 
KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
+
+    private static final int MIN_SESSION_DURATION_SECONDS = 900;
+    private static final int MAX_SESSION_DURATION_SECONDS = 3600;
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_SESSION_TIME = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity Session Time")
+            .description("Session time in seconds for AWS STS 
AssumeRoleWithWebIdentity (between 900 and 3600 seconds).")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .defaultValue(String.valueOf(MAX_SESSION_DURATION_SECONDS))
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_REGION = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity STS Region")
+            .description("Region identifier used for the AWS Security Token 
Service when exchanging Web Identity tokens.")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity STS Endpoint Override")
+            .description("Optional endpoint override for the AWS Security 
Token Service.")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor 
AWS_WEB_IDENTITY_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("AWS Web Identity SSL Context Service")
+            .description("SSL Context Service used when communicating with AWS 
STS for Web Identity federation.")
+            .identifiesControllerService(SSLContextService.class)

Review Comment:
   This should use `SSLContextProvider`:
   
   ```suggestion
               .identifiesControllerService(SSLContextProvider.class)
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -69,6 +155,172 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
         return supportedPropertyDescriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final AwsRoleSource roleSource = 
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+        if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+            if 
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+                results.add(new ValidationResult.Builder()
+                        
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+                        .valid(false)
+                        .explanation("AWS Web Identity Token Provider must be 
configured when AWS Role Source is set to Web Identity Provider")
+                        .build());
+            }
+
+            final PropertyValue sessionTimeProperty = 
validationContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME);
+            if (sessionTimeProperty != null && sessionTimeProperty.isSet()) {
+                final Integer sessionSeconds = sessionTimeProperty.asInteger();
+                if (sessionSeconds == null || sessionSeconds < 
MIN_SESSION_DURATION_SECONDS || sessionSeconds > MAX_SESSION_DURATION_SECONDS) {
+                    results.add(new ValidationResult.Builder()
+                            
.subject(AWS_WEB_IDENTITY_SESSION_TIME.getDisplayName())
+                            .valid(false)
+                            .explanation(String.format("Session time must be 
between %d and %d seconds", MIN_SESSION_DURATION_SECONDS, 
MAX_SESSION_DURATION_SECONDS))
+                            .build());
+                }
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    protected void customizeKafkaProperties(final Properties properties, final 
PropertyContext propertyContext) {
+        final AwsRoleSource roleSource = 
propertyContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+        if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+            final AwsCredentialsProvider credentialsProvider = 
createWebIdentityCredentialsProvider(propertyContext);
+            
properties.put(AmazonMSKKafkaProperties.NIFI_AWS_MSK_CREDENTIALS_PROVIDER, 
credentialsProvider);
+        }
+    }
+
+    private AwsCredentialsProvider createWebIdentityCredentialsProvider(final 
PropertyContext propertyContext) {
+        final OAuth2AccessTokenProvider tokenProvider = 
propertyContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+        if (tokenProvider == null) {
+            throw new IllegalStateException("AWS Web Identity Token Provider 
is required when AWS Role Source is set to Web Identity Provider");
+        }
+
+        final String roleArn = 
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
+        final String roleSessionName = 
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
+        final Integer sessionSeconds = 
propertyContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME).asInteger();
+        final String stsRegionId = 
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_REGION).getValue();
+        final String stsEndpoint = 
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_ENDPOINT).getValue();
+        final SSLContextProvider sslContextProvider = 
propertyContext.getProperty(AWS_WEB_IDENTITY_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
+
+        final ApacheHttpClient.Builder httpClientBuilder = 
ApacheHttpClient.builder();
+
+        if (sslContextProvider != null) {
+            final SSLContext sslContext = sslContextProvider.createContext();
+            httpClientBuilder.socketFactory(new 
SSLConnectionSocketFactory(sslContext));
+        }
+
+        final StsClientBuilder stsClientBuilder = 
StsClient.builder().httpClient(httpClientBuilder.build());
+
+        if (!StringUtils.isBlank(stsRegionId)) {
+            stsClientBuilder.region(Region.of(stsRegionId));
+        }
+
+        if (!StringUtils.isBlank(stsEndpoint)) {
+            stsClientBuilder.endpointOverride(URI.create(stsEndpoint));
+        }
+
+        final StsClient stsClient = stsClientBuilder.build();
+
+        return new WebIdentityCredentialsProvider(stsClient, tokenProvider, 
roleArn, roleSessionName, sessionSeconds);
+    }
+
+    private static final class WebIdentityCredentialsProvider implements 
AwsCredentialsProvider, AutoCloseable {
+        private static final Duration SKEW = Duration.ofSeconds(60);
+
+        private final StsClient stsClient;
+        private final OAuth2AccessTokenProvider tokenProvider;
+        private final String roleArn;
+        private final String roleSessionName;
+        private final Integer sessionSeconds;
+
+        private volatile AwsSessionCredentials cachedCredentials;
+        private volatile Instant expiration;
+
+        private WebIdentityCredentialsProvider(final StsClient stsClient,
+                                               final OAuth2AccessTokenProvider 
tokenProvider,
+                                               final String roleArn,
+                                               final String roleSessionName,
+                                               final Integer sessionSeconds) {
+            this.stsClient = Objects.requireNonNull(stsClient, "stsClient 
required");
+            this.tokenProvider = Objects.requireNonNull(tokenProvider, 
"tokenProvider required");
+            this.roleArn = Objects.requireNonNull(roleArn, "roleArn required");
+            this.roleSessionName = Objects.requireNonNull(roleSessionName, 
"roleSessionName required");
+            this.sessionSeconds = sessionSeconds;
+        }
+
+        @Override
+        public AwsCredentials resolveCredentials() {
+            final Instant now = Instant.now();
+            final AwsSessionCredentials currentCredentials = cachedCredentials;
+            final Instant currentExpiration = expiration;
+
+            if (currentCredentials != null && currentExpiration != null && 
now.isBefore(currentExpiration.minus(SKEW))) {
+                return currentCredentials;
+            }
+
+            synchronized (this) {
+                if (cachedCredentials != null && expiration != null && 
Instant.now().isBefore(expiration.minus(SKEW))) {
+                    return cachedCredentials;
+                }
+
+                final String webIdentityToken = getWebIdentityToken();
+
+                final AssumeRoleWithWebIdentityRequest.Builder requestBuilder 
= AssumeRoleWithWebIdentityRequest.builder()
+                        .roleArn(roleArn)
+                        .roleSessionName(roleSessionName)
+                        .webIdentityToken(webIdentityToken);
+
+                if (sessionSeconds != null) {
+                    requestBuilder.durationSeconds(sessionSeconds);
+                }
+
+                final AssumeRoleWithWebIdentityResponse response = 
stsClient.assumeRoleWithWebIdentity(requestBuilder.build());
+                final Credentials temporaryCredentials = 
response.credentials();
+                final AwsSessionCredentials sessionCredentials = 
AwsSessionCredentials.create(
+                        temporaryCredentials.accessKeyId(), 
temporaryCredentials.secretAccessKey(), temporaryCredentials.sessionToken());
+
+                cachedCredentials = sessionCredentials;
+                expiration = temporaryCredentials.expiration();
+                return sessionCredentials;
+            }
+        }
+
+        private String getWebIdentityToken() {
+            final AccessToken accessToken = tokenProvider.getAccessDetails();
+            if (accessToken == null) {
+                throw new IllegalStateException("OAuth2AccessTokenProvider 
returned null AccessToken");
+            }
+
+            final Map<String, Object> additionalParameters = 
accessToken.getAdditionalParameters();
+            if (additionalParameters != null) {
+                final Object idTokenValue = 
additionalParameters.get("id_token");
+                if (idTokenValue instanceof String idToken && 
!StringUtils.isBlank(idToken)) {
+                    return idToken;
+                }
+                if (idTokenValue instanceof String idTokenBlank && 
StringUtils.isBlank(idTokenBlank)) {
+                    throw new IllegalStateException("OAuth2AccessTokenProvider 
returned an empty id_token");
+                }
+            }
+
+            final String accessTokenValue = accessToken.getAccessToken();
+            if (StringUtils.isBlank(accessTokenValue)) {
+                throw new IllegalStateException("No usable token found in 
AccessToken (id_token or access_token)");
+            }
+
+            return accessTokenValue;

Review Comment:
   For clarity of implementation, I recommend refactoring to a single return.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -42,6 +73,56 @@ public class AmazonMSKConnectionService extends 
Kafka3ConnectionService {
             .defaultValue(SaslMechanism.AWS_MSK_IAM)
             .build();
 
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER = 
KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
+
+    private static final int MIN_SESSION_DURATION_SECONDS = 900;
+    private static final int MAX_SESSION_DURATION_SECONDS = 3600;
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_SESSION_TIME = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity Session Time")
+            .description("Session time in seconds for AWS STS 
AssumeRoleWithWebIdentity (between 900 and 3600 seconds).")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .defaultValue(String.valueOf(MAX_SESSION_DURATION_SECONDS))
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_REGION = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity STS Region")
+            .description("Region identifier used for the AWS Security Token 
Service when exchanging Web Identity tokens.")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity STS Endpoint Override")

Review Comment:
   Recommend removing `Override`.
   ```suggestion
               .name("AWS Web Identity STS Endpoint")
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKKafkaProperties;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.msk.auth.iam.IAMLoginModule;
+import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Callback handler that supplies AWS credentials sourced from NiFi 
configuration for AWS MSK IAM authentication.
+ */
+public class AmazonMSKCredentialsCallbackHandler implements 
AuthenticateCallbackHandler {
+
+    private AwsCredentialsProvider credentialsProvider;
+
+    @Override
+    public void configure(final Map<String, ?> configs, final String 
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!IAMLoginModule.MECHANISM.equals(saslMechanism)) {
+            throw new IllegalArgumentException("Unexpected SASL mechanism: " + 
saslMechanism);
+        }
+
+        final Object provider = 
configs.get(AmazonMSKKafkaProperties.NIFI_AWS_MSK_CREDENTIALS_PROVIDER);
+        if (!(provider instanceof AwsCredentialsProvider)) {
+            throw new IllegalArgumentException("Kafka configuration missing 
AWS Web Identity credentials provider");
+        }
+
+        credentialsProvider = (AwsCredentialsProvider) provider;
+
+        final AppConfigurationEntry loginModuleEntry = 
jaasConfigEntries.stream()
+                .filter(entry -> 
IAMLoginModule.class.getName().equals(entry.getLoginModuleName()))
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("JAAS 
configuration missing IAMLoginModule entry"));
+
+        final Map<String, ?> jaasOptions = loginModuleEntry.getOptions();
+        final Object roleArn = jaasOptions.get("awsRoleArn");
+        final Object roleSessionName = jaasOptions.get("awsRoleSessionName");
+        if (roleArn == null || roleSessionName == null) {
+            throw new IllegalStateException("JAAS configuration missing 
required awsRoleArn or awsRoleSessionName options for Web Identity 
authentication");
+        }
+    }
+
+    @Override
+    public void handle(final Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+        for (final Callback callback : callbacks) {
+            if (callback instanceof AWSCredentialsCallback 
awsCredentialsCallback) {
+                handleCredentialsCallback(awsCredentialsCallback);
+            } else {
+                throw new UnsupportedCallbackException(callback, 
String.format("Unsupported callback type [%s]", callback.getClass().getName()));

Review Comment:
   ```suggestion
                   throw new UnsupportedCallbackException(callback, 
"Unsupported callback type [%s]".formatted(callback.getClass().getName()));
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKKafkaProperties;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.msk.auth.iam.IAMLoginModule;
+import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Callback handler that supplies AWS credentials sourced from NiFi 
configuration for AWS MSK IAM authentication.
+ */
+public class AmazonMSKCredentialsCallbackHandler implements 
AuthenticateCallbackHandler {
+
+    private AwsCredentialsProvider credentialsProvider;
+
+    @Override
+    public void configure(final Map<String, ?> configs, final String 
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
+        if (!IAMLoginModule.MECHANISM.equals(saslMechanism)) {
+            throw new IllegalArgumentException("Unexpected SASL mechanism: " + 
saslMechanism);
+        }
+
+        final Object provider = 
configs.get(AmazonMSKKafkaProperties.NIFI_AWS_MSK_CREDENTIALS_PROVIDER);
+        if (!(provider instanceof AwsCredentialsProvider)) {
+            throw new IllegalArgumentException("Kafka configuration missing 
AWS Web Identity credentials provider");
+        }
+
+        credentialsProvider = (AwsCredentialsProvider) provider;
+
+        final AppConfigurationEntry loginModuleEntry = 
jaasConfigEntries.stream()
+                .filter(entry -> 
IAMLoginModule.class.getName().equals(entry.getLoginModuleName()))
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("JAAS 
configuration missing IAMLoginModule entry"));
+
+        final Map<String, ?> jaasOptions = loginModuleEntry.getOptions();
+        final Object roleArn = jaasOptions.get("awsRoleArn");
+        final Object roleSessionName = jaasOptions.get("awsRoleSessionName");
+        if (roleArn == null || roleSessionName == null) {
+            throw new IllegalStateException("JAAS configuration missing 
required awsRoleArn or awsRoleSessionName options for Web Identity 
authentication");
+        }
+    }
+
+    @Override
+    public void handle(final Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+        for (final Callback callback : callbacks) {
+            if (callback instanceof AWSCredentialsCallback 
awsCredentialsCallback) {
+                handleCredentialsCallback(awsCredentialsCallback);
+            } else {
+                throw new UnsupportedCallbackException(callback, 
String.format("Unsupported callback type [%s]", callback.getClass().getName()));
+            }
+        }
+    }
+
+    private void handleCredentialsCallback(final AWSCredentialsCallback 
callback) {
+        try {
+            final AwsCredentials awsCredentials = 
credentialsProvider.resolveCredentials();
+            callback.setAwsCredentials(awsCredentials);
+        } catch (final Exception e) {
+            callback.setLoadingException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        // No resources to close; credentials provider lifecycle managed by 
NiFi service

Review Comment:
   Recommend using a method comment for this instead of the inline comment



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -69,6 +155,172 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
         return supportedPropertyDescriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final AwsRoleSource roleSource = 
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+        if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+            if 
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+                results.add(new ValidationResult.Builder()
+                        
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+                        .valid(false)
+                        .explanation("AWS Web Identity Token Provider must be 
configured when AWS Role Source is set to Web Identity Provider")
+                        .build());
+            }
+
+            final PropertyValue sessionTimeProperty = 
validationContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME);
+            if (sessionTimeProperty != null && sessionTimeProperty.isSet()) {
+                final Integer sessionSeconds = sessionTimeProperty.asInteger();
+                if (sessionSeconds == null || sessionSeconds < 
MIN_SESSION_DURATION_SECONDS || sessionSeconds > MAX_SESSION_DURATION_SECONDS) {
+                    results.add(new ValidationResult.Builder()
+                            
.subject(AWS_WEB_IDENTITY_SESSION_TIME.getDisplayName())
+                            .valid(false)
+                            .explanation(String.format("Session time must be 
between %d and %d seconds", MIN_SESSION_DURATION_SECONDS, 
MAX_SESSION_DURATION_SECONDS))
+                            .build());
+                }
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    protected void customizeKafkaProperties(final Properties properties, final 
PropertyContext propertyContext) {

Review Comment:
   After refactoring the visibility of the `get...Properties()` methods, this 
could still be a shared method named something like 
`setAuthenticationProperties`



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -42,6 +73,56 @@ public class AmazonMSKConnectionService extends 
Kafka3ConnectionService {
             .defaultValue(SaslMechanism.AWS_MSK_IAM)
             .build();
 
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER = 
KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
+
+    private static final int MIN_SESSION_DURATION_SECONDS = 900;
+    private static final int MAX_SESSION_DURATION_SECONDS = 3600;
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_SESSION_TIME = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity Session Time")
+            .description("Session time in seconds for AWS STS 
AssumeRoleWithWebIdentity (between 900 and 3600 seconds).")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .defaultValue(String.valueOf(MAX_SESSION_DURATION_SECONDS))
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_REGION = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity STS Region")
+            .description("Region identifier used for the AWS Security Token 
Service when exchanging Web Identity tokens.")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("AWS Web Identity STS Endpoint Override")
+            .description("Optional endpoint override for the AWS Security 
Token Service.")
+            .dependsOn(
+                    KafkaClientComponent.AWS_ROLE_SOURCE,
+                    AwsRoleSource.WEB_IDENTITY_TOKEN
+            )
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor 
AWS_WEB_IDENTITY_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("AWS Web Identity SSL Context Service")

Review Comment:
   ```suggestion
       public static final PropertyDescriptor 
AWS_WEB_IDENTITY_SSL_CONTEXT_PROVIDER = new PropertyDescriptor.Builder()
               .name("AWS Web Identity SSL Context Provider")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to