exceptionfactory commented on code in PR #10489:
URL: https://github.com/apache/nifi/pull/10489#discussion_r2506960226
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java:
##########
@@ -152,12 +153,24 @@ public interface KafkaClientComponent {
.required(true)
.dependsOn(
KafkaClientComponent.AWS_ROLE_SOURCE,
- AwsRoleSource.SPECIFIED_ROLE
+ AwsRoleSource.SPECIFIED_ROLE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
+ PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity Token Provider")
+ .description("Controller Service providing OAuth2/OIDC tokens for
AWS Web Identity federation.")
Review Comment:
```suggestion
.description("Controller Service providing tokens with OAuth2
OpenID Connect for AWS Web Identity federation.")
```
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java:
##########
@@ -400,6 +406,17 @@ private void setSslProperties(final Properties properties,
final PropertyContext
}
}
+ /**
+ * Hook for subclasses to customize Kafka client, producer, or consumer
properties after the standard
+ * configuration has been applied.
+ *
+ * @param properties Kafka client properties to update
+ * @param propertyContext NiFi property context with component
configuration
+ */
+ protected void customizeKafkaProperties(final Properties properties, final
PropertyContext propertyContext) {
Review Comment:
Rather than implementing a callback approach for extension, with ambiguous
reference to the particular type of property, I recommend changing the
visibility on the `get...Properties()` methods to `protected`. That would allow
subclasses to call the super method, and then make adjustments, while
preserving the type of properties requested.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/aws/AmazonMSKKafkaProperties.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.aws;
+
+/**
+ * AWS MSK-specific Kafka property keys shared between components.
+ */
+public final class AmazonMSKKafkaProperties {
Review Comment:
`Kafka` is redundant in the name. Rather than using `public static final` in
a class, I recommend changing this to an enum named `AmazonMSKProperty` and
having a `getProperty()` public method that returns the values.
```suggestion
public enum AmazonMSKProperty {
```
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -69,6 +155,172 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
return supportedPropertyDescriptors;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final Collection<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final AwsRoleSource roleSource =
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+ if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+ if
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+ results.add(new ValidationResult.Builder()
+
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+ .valid(false)
+ .explanation("AWS Web Identity Token Provider must be
configured when AWS Role Source is set to Web Identity Provider")
+ .build());
+ }
+
+ final PropertyValue sessionTimeProperty =
validationContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME);
+ if (sessionTimeProperty != null && sessionTimeProperty.isSet()) {
+ final Integer sessionSeconds = sessionTimeProperty.asInteger();
+ if (sessionSeconds == null || sessionSeconds <
MIN_SESSION_DURATION_SECONDS || sessionSeconds > MAX_SESSION_DURATION_SECONDS) {
+ results.add(new ValidationResult.Builder()
+
.subject(AWS_WEB_IDENTITY_SESSION_TIME.getDisplayName())
+ .valid(false)
+ .explanation(String.format("Session time must be
between %d and %d seconds", MIN_SESSION_DURATION_SECONDS,
MAX_SESSION_DURATION_SECONDS))
+ .build());
+ }
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ protected void customizeKafkaProperties(final Properties properties, final
PropertyContext propertyContext) {
+ final AwsRoleSource roleSource =
propertyContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+ if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+ final AwsCredentialsProvider credentialsProvider =
createWebIdentityCredentialsProvider(propertyContext);
+
properties.put(AmazonMSKKafkaProperties.NIFI_AWS_MSK_CREDENTIALS_PROVIDER,
credentialsProvider);
+ }
+ }
+
+ private AwsCredentialsProvider createWebIdentityCredentialsProvider(final
PropertyContext propertyContext) {
+ final OAuth2AccessTokenProvider tokenProvider =
propertyContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ if (tokenProvider == null) {
+ throw new IllegalStateException("AWS Web Identity Token Provider
is required when AWS Role Source is set to Web Identity Provider");
+ }
+
+ final String roleArn =
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
+ final String roleSessionName =
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
+ final Integer sessionSeconds =
propertyContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME).asInteger();
+ final String stsRegionId =
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_REGION).getValue();
+ final String stsEndpoint =
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_ENDPOINT).getValue();
+ final SSLContextProvider sslContextProvider =
propertyContext.getProperty(AWS_WEB_IDENTITY_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
+
+ final ApacheHttpClient.Builder httpClientBuilder =
ApacheHttpClient.builder();
+
+ if (sslContextProvider != null) {
+ final SSLContext sslContext = sslContextProvider.createContext();
+ httpClientBuilder.socketFactory(new
SSLConnectionSocketFactory(sslContext));
+ }
+
+ final StsClientBuilder stsClientBuilder =
StsClient.builder().httpClient(httpClientBuilder.build());
+
+ if (!StringUtils.isBlank(stsRegionId)) {
+ stsClientBuilder.region(Region.of(stsRegionId));
+ }
+
+ if (!StringUtils.isBlank(stsEndpoint)) {
+ stsClientBuilder.endpointOverride(URI.create(stsEndpoint));
+ }
+
+ final StsClient stsClient = stsClientBuilder.build();
+
+ return new WebIdentityCredentialsProvider(stsClient, tokenProvider,
roleArn, roleSessionName, sessionSeconds);
+ }
+
+ private static final class WebIdentityCredentialsProvider implements
AwsCredentialsProvider, AutoCloseable {
+ private static final Duration SKEW = Duration.ofSeconds(60);
+
+ private final StsClient stsClient;
+ private final OAuth2AccessTokenProvider tokenProvider;
+ private final String roleArn;
+ private final String roleSessionName;
+ private final Integer sessionSeconds;
+
+ private volatile AwsSessionCredentials cachedCredentials;
+ private volatile Instant expiration;
+
+ private WebIdentityCredentialsProvider(final StsClient stsClient,
+ final OAuth2AccessTokenProvider
tokenProvider,
+ final String roleArn,
+ final String roleSessionName,
+ final Integer sessionSeconds) {
+ this.stsClient = Objects.requireNonNull(stsClient, "stsClient
required");
+ this.tokenProvider = Objects.requireNonNull(tokenProvider,
"tokenProvider required");
+ this.roleArn = Objects.requireNonNull(roleArn, "roleArn required");
+ this.roleSessionName = Objects.requireNonNull(roleSessionName,
"roleSessionName required");
+ this.sessionSeconds = sessionSeconds;
+ }
+
+ @Override
+ public AwsCredentials resolveCredentials() {
+ final Instant now = Instant.now();
+ final AwsSessionCredentials currentCredentials = cachedCredentials;
+ final Instant currentExpiration = expiration;
+
+ if (currentCredentials != null && currentExpiration != null &&
now.isBefore(currentExpiration.minus(SKEW))) {
+ return currentCredentials;
Review Comment:
Recommend refactoring to a single return. It may also be useful to add
another private method for evaluating the expiration that encapsulates null
checking.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -69,6 +155,172 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
return supportedPropertyDescriptors;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final Collection<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final AwsRoleSource roleSource =
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+ if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+ if
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+ results.add(new ValidationResult.Builder()
+
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+ .valid(false)
+ .explanation("AWS Web Identity Token Provider must be
configured when AWS Role Source is set to Web Identity Provider")
+ .build());
+ }
+
+ final PropertyValue sessionTimeProperty =
validationContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME);
Review Comment:
Can the `StandardValidators.createTimePeriodValidator()` be used to set
minimum and maximum values?
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -42,6 +73,56 @@ public class AmazonMSKConnectionService extends
Kafka3ConnectionService {
.defaultValue(SaslMechanism.AWS_MSK_IAM)
.build();
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER =
KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
+
+ private static final int MIN_SESSION_DURATION_SECONDS = 900;
+ private static final int MAX_SESSION_DURATION_SECONDS = 3600;
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_SESSION_TIME = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity Session Time")
+ .description("Session time in seconds for AWS STS
AssumeRoleWithWebIdentity (between 900 and 3600 seconds).")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .defaultValue(String.valueOf(MAX_SESSION_DURATION_SECONDS))
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_REGION = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity STS Region")
+ .description("Region identifier used for the AWS Security Token
Service when exchanging Web Identity tokens.")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_ENDPOINT = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity STS Endpoint Override")
+ .description("Optional endpoint override for the AWS Security
Token Service.")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor
AWS_WEB_IDENTITY_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("AWS Web Identity SSL Context Service")
+ .description("SSL Context Service used when communicating with AWS
STS for Web Identity federation.")
+ .identifiesControllerService(SSLContextService.class)
Review Comment:
This should use `SSLContextProvider`:
```suggestion
.identifiesControllerService(SSLContextProvider.class)
```
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -69,6 +155,172 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
return supportedPropertyDescriptors;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final Collection<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final AwsRoleSource roleSource =
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+ if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+ if
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+ results.add(new ValidationResult.Builder()
+
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+ .valid(false)
+ .explanation("AWS Web Identity Token Provider must be
configured when AWS Role Source is set to Web Identity Provider")
+ .build());
+ }
+
+ final PropertyValue sessionTimeProperty =
validationContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME);
+ if (sessionTimeProperty != null && sessionTimeProperty.isSet()) {
+ final Integer sessionSeconds = sessionTimeProperty.asInteger();
+ if (sessionSeconds == null || sessionSeconds <
MIN_SESSION_DURATION_SECONDS || sessionSeconds > MAX_SESSION_DURATION_SECONDS) {
+ results.add(new ValidationResult.Builder()
+
.subject(AWS_WEB_IDENTITY_SESSION_TIME.getDisplayName())
+ .valid(false)
+ .explanation(String.format("Session time must be
between %d and %d seconds", MIN_SESSION_DURATION_SECONDS,
MAX_SESSION_DURATION_SECONDS))
+ .build());
+ }
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ protected void customizeKafkaProperties(final Properties properties, final
PropertyContext propertyContext) {
+ final AwsRoleSource roleSource =
propertyContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+ if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+ final AwsCredentialsProvider credentialsProvider =
createWebIdentityCredentialsProvider(propertyContext);
+
properties.put(AmazonMSKKafkaProperties.NIFI_AWS_MSK_CREDENTIALS_PROVIDER,
credentialsProvider);
+ }
+ }
+
+ private AwsCredentialsProvider createWebIdentityCredentialsProvider(final
PropertyContext propertyContext) {
+ final OAuth2AccessTokenProvider tokenProvider =
propertyContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ if (tokenProvider == null) {
+ throw new IllegalStateException("AWS Web Identity Token Provider
is required when AWS Role Source is set to Web Identity Provider");
+ }
+
+ final String roleArn =
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_ARN).getValue();
+ final String roleSessionName =
propertyContext.getProperty(KafkaClientComponent.AWS_ASSUME_ROLE_SESSION_NAME).getValue();
+ final Integer sessionSeconds =
propertyContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME).asInteger();
+ final String stsRegionId =
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_REGION).getValue();
+ final String stsEndpoint =
propertyContext.getProperty(AWS_WEB_IDENTITY_STS_ENDPOINT).getValue();
+ final SSLContextProvider sslContextProvider =
propertyContext.getProperty(AWS_WEB_IDENTITY_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
+
+ final ApacheHttpClient.Builder httpClientBuilder =
ApacheHttpClient.builder();
+
+ if (sslContextProvider != null) {
+ final SSLContext sslContext = sslContextProvider.createContext();
+ httpClientBuilder.socketFactory(new
SSLConnectionSocketFactory(sslContext));
+ }
+
+ final StsClientBuilder stsClientBuilder =
StsClient.builder().httpClient(httpClientBuilder.build());
+
+ if (!StringUtils.isBlank(stsRegionId)) {
+ stsClientBuilder.region(Region.of(stsRegionId));
+ }
+
+ if (!StringUtils.isBlank(stsEndpoint)) {
+ stsClientBuilder.endpointOverride(URI.create(stsEndpoint));
+ }
+
+ final StsClient stsClient = stsClientBuilder.build();
+
+ return new WebIdentityCredentialsProvider(stsClient, tokenProvider,
roleArn, roleSessionName, sessionSeconds);
+ }
+
+ private static final class WebIdentityCredentialsProvider implements
AwsCredentialsProvider, AutoCloseable {
+ private static final Duration SKEW = Duration.ofSeconds(60);
+
+ private final StsClient stsClient;
+ private final OAuth2AccessTokenProvider tokenProvider;
+ private final String roleArn;
+ private final String roleSessionName;
+ private final Integer sessionSeconds;
+
+ private volatile AwsSessionCredentials cachedCredentials;
+ private volatile Instant expiration;
+
+ private WebIdentityCredentialsProvider(final StsClient stsClient,
+ final OAuth2AccessTokenProvider
tokenProvider,
+ final String roleArn,
+ final String roleSessionName,
+ final Integer sessionSeconds) {
+ this.stsClient = Objects.requireNonNull(stsClient, "stsClient
required");
+ this.tokenProvider = Objects.requireNonNull(tokenProvider,
"tokenProvider required");
+ this.roleArn = Objects.requireNonNull(roleArn, "roleArn required");
+ this.roleSessionName = Objects.requireNonNull(roleSessionName,
"roleSessionName required");
+ this.sessionSeconds = sessionSeconds;
+ }
+
+ @Override
+ public AwsCredentials resolveCredentials() {
+ final Instant now = Instant.now();
+ final AwsSessionCredentials currentCredentials = cachedCredentials;
+ final Instant currentExpiration = expiration;
+
+ if (currentCredentials != null && currentExpiration != null &&
now.isBefore(currentExpiration.minus(SKEW))) {
+ return currentCredentials;
+ }
+
+ synchronized (this) {
+ if (cachedCredentials != null && expiration != null &&
Instant.now().isBefore(expiration.minus(SKEW))) {
+ return cachedCredentials;
+ }
+
+ final String webIdentityToken = getWebIdentityToken();
+
+ final AssumeRoleWithWebIdentityRequest.Builder requestBuilder
= AssumeRoleWithWebIdentityRequest.builder()
+ .roleArn(roleArn)
+ .roleSessionName(roleSessionName)
+ .webIdentityToken(webIdentityToken);
+
+ if (sessionSeconds != null) {
+ requestBuilder.durationSeconds(sessionSeconds);
+ }
+
+ final AssumeRoleWithWebIdentityResponse response =
stsClient.assumeRoleWithWebIdentity(requestBuilder.build());
+ final Credentials temporaryCredentials =
response.credentials();
+ final AwsSessionCredentials sessionCredentials =
AwsSessionCredentials.create(
+ temporaryCredentials.accessKeyId(),
temporaryCredentials.secretAccessKey(), temporaryCredentials.sessionToken());
+
+ cachedCredentials = sessionCredentials;
+ expiration = temporaryCredentials.expiration();
+ return sessionCredentials;
+ }
+ }
+
+ private String getWebIdentityToken() {
+ final AccessToken accessToken = tokenProvider.getAccessDetails();
+ if (accessToken == null) {
+ throw new IllegalStateException("OAuth2AccessTokenProvider
returned null AccessToken");
+ }
+
+ final Map<String, Object> additionalParameters =
accessToken.getAdditionalParameters();
+ if (additionalParameters != null) {
+ final Object idTokenValue =
additionalParameters.get("id_token");
+ if (idTokenValue instanceof String idToken &&
!StringUtils.isBlank(idToken)) {
+ return idToken;
+ }
+ if (idTokenValue instanceof String idTokenBlank &&
StringUtils.isBlank(idTokenBlank)) {
+ throw new IllegalStateException("OAuth2AccessTokenProvider
returned an empty id_token");
+ }
+ }
+
+ final String accessTokenValue = accessToken.getAccessToken();
+ if (StringUtils.isBlank(accessTokenValue)) {
+ throw new IllegalStateException("No usable token found in
AccessToken (id_token or access_token)");
+ }
+
+ return accessTokenValue;
Review Comment:
For clarity of implementation, I recommend refactoring to a single return.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -42,6 +73,56 @@ public class AmazonMSKConnectionService extends
Kafka3ConnectionService {
.defaultValue(SaslMechanism.AWS_MSK_IAM)
.build();
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER =
KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
+
+ private static final int MIN_SESSION_DURATION_SECONDS = 900;
+ private static final int MAX_SESSION_DURATION_SECONDS = 3600;
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_SESSION_TIME = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity Session Time")
+ .description("Session time in seconds for AWS STS
AssumeRoleWithWebIdentity (between 900 and 3600 seconds).")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .defaultValue(String.valueOf(MAX_SESSION_DURATION_SECONDS))
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_REGION = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity STS Region")
+ .description("Region identifier used for the AWS Security Token
Service when exchanging Web Identity tokens.")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_ENDPOINT = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity STS Endpoint Override")
Review Comment:
Recommend removing `Override`.
```suggestion
.name("AWS Web Identity STS Endpoint")
```
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKKafkaProperties;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.msk.auth.iam.IAMLoginModule;
+import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Callback handler that supplies AWS credentials sourced from NiFi
configuration for AWS MSK IAM authentication.
+ */
+public class AmazonMSKCredentialsCallbackHandler implements
AuthenticateCallbackHandler {
+
+ private AwsCredentialsProvider credentialsProvider;
+
+ @Override
+ public void configure(final Map<String, ?> configs, final String
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
+ if (!IAMLoginModule.MECHANISM.equals(saslMechanism)) {
+ throw new IllegalArgumentException("Unexpected SASL mechanism: " +
saslMechanism);
+ }
+
+ final Object provider =
configs.get(AmazonMSKKafkaProperties.NIFI_AWS_MSK_CREDENTIALS_PROVIDER);
+ if (!(provider instanceof AwsCredentialsProvider)) {
+ throw new IllegalArgumentException("Kafka configuration missing
AWS Web Identity credentials provider");
+ }
+
+ credentialsProvider = (AwsCredentialsProvider) provider;
+
+ final AppConfigurationEntry loginModuleEntry =
jaasConfigEntries.stream()
+ .filter(entry ->
IAMLoginModule.class.getName().equals(entry.getLoginModuleName()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("JAAS
configuration missing IAMLoginModule entry"));
+
+ final Map<String, ?> jaasOptions = loginModuleEntry.getOptions();
+ final Object roleArn = jaasOptions.get("awsRoleArn");
+ final Object roleSessionName = jaasOptions.get("awsRoleSessionName");
+ if (roleArn == null || roleSessionName == null) {
+ throw new IllegalStateException("JAAS configuration missing
required awsRoleArn or awsRoleSessionName options for Web Identity
authentication");
+ }
+ }
+
+ @Override
+ public void handle(final Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
+ for (final Callback callback : callbacks) {
+ if (callback instanceof AWSCredentialsCallback
awsCredentialsCallback) {
+ handleCredentialsCallback(awsCredentialsCallback);
+ } else {
+ throw new UnsupportedCallbackException(callback,
String.format("Unsupported callback type [%s]", callback.getClass().getName()));
Review Comment:
```suggestion
throw new UnsupportedCallbackException(callback,
"Unsupported callback type [%s]".formatted(callback.getClass().getName()));
```
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKCredentialsCallbackHandler.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.aws;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.nifi.kafka.shared.aws.AmazonMSKKafkaProperties;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.msk.auth.iam.IAMLoginModule;
+import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Callback handler that supplies AWS credentials sourced from NiFi
configuration for AWS MSK IAM authentication.
+ */
+public class AmazonMSKCredentialsCallbackHandler implements
AuthenticateCallbackHandler {
+
+ private AwsCredentialsProvider credentialsProvider;
+
+ @Override
+ public void configure(final Map<String, ?> configs, final String
saslMechanism, final List<AppConfigurationEntry> jaasConfigEntries) {
+ if (!IAMLoginModule.MECHANISM.equals(saslMechanism)) {
+ throw new IllegalArgumentException("Unexpected SASL mechanism: " +
saslMechanism);
+ }
+
+ final Object provider =
configs.get(AmazonMSKKafkaProperties.NIFI_AWS_MSK_CREDENTIALS_PROVIDER);
+ if (!(provider instanceof AwsCredentialsProvider)) {
+ throw new IllegalArgumentException("Kafka configuration missing
AWS Web Identity credentials provider");
+ }
+
+ credentialsProvider = (AwsCredentialsProvider) provider;
+
+ final AppConfigurationEntry loginModuleEntry =
jaasConfigEntries.stream()
+ .filter(entry ->
IAMLoginModule.class.getName().equals(entry.getLoginModuleName()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("JAAS
configuration missing IAMLoginModule entry"));
+
+ final Map<String, ?> jaasOptions = loginModuleEntry.getOptions();
+ final Object roleArn = jaasOptions.get("awsRoleArn");
+ final Object roleSessionName = jaasOptions.get("awsRoleSessionName");
+ if (roleArn == null || roleSessionName == null) {
+ throw new IllegalStateException("JAAS configuration missing
required awsRoleArn or awsRoleSessionName options for Web Identity
authentication");
+ }
+ }
+
+ @Override
+ public void handle(final Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
+ for (final Callback callback : callbacks) {
+ if (callback instanceof AWSCredentialsCallback
awsCredentialsCallback) {
+ handleCredentialsCallback(awsCredentialsCallback);
+ } else {
+ throw new UnsupportedCallbackException(callback,
String.format("Unsupported callback type [%s]", callback.getClass().getName()));
+ }
+ }
+ }
+
+ private void handleCredentialsCallback(final AWSCredentialsCallback
callback) {
+ try {
+ final AwsCredentials awsCredentials =
credentialsProvider.resolveCredentials();
+ callback.setAwsCredentials(awsCredentials);
+ } catch (final Exception e) {
+ callback.setLoadingException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ // No resources to close; credentials provider lifecycle managed by
NiFi service
Review Comment:
Recommend using a method comment for this instead of the inline comment
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -69,6 +155,172 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
return supportedPropertyDescriptors;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final Collection<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final AwsRoleSource roleSource =
validationContext.getProperty(KafkaClientComponent.AWS_ROLE_SOURCE).asAllowableValue(AwsRoleSource.class);
+ if (roleSource == AwsRoleSource.WEB_IDENTITY_TOKEN) {
+ if
(!validationContext.getProperty(AWS_WEB_IDENTITY_TOKEN_PROVIDER).isSet()) {
+ results.add(new ValidationResult.Builder()
+
.subject(AWS_WEB_IDENTITY_TOKEN_PROVIDER.getDisplayName())
+ .valid(false)
+ .explanation("AWS Web Identity Token Provider must be
configured when AWS Role Source is set to Web Identity Provider")
+ .build());
+ }
+
+ final PropertyValue sessionTimeProperty =
validationContext.getProperty(AWS_WEB_IDENTITY_SESSION_TIME);
+ if (sessionTimeProperty != null && sessionTimeProperty.isSet()) {
+ final Integer sessionSeconds = sessionTimeProperty.asInteger();
+ if (sessionSeconds == null || sessionSeconds <
MIN_SESSION_DURATION_SECONDS || sessionSeconds > MAX_SESSION_DURATION_SECONDS) {
+ results.add(new ValidationResult.Builder()
+
.subject(AWS_WEB_IDENTITY_SESSION_TIME.getDisplayName())
+ .valid(false)
+ .explanation(String.format("Session time must be
between %d and %d seconds", MIN_SESSION_DURATION_SECONDS,
MAX_SESSION_DURATION_SECONDS))
+ .build());
+ }
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ protected void customizeKafkaProperties(final Properties properties, final
PropertyContext propertyContext) {
Review Comment:
After refactoring the visibility of the `get...Properties()` methods, this
could still be a shared method named something like
`setAuthenticationProperties`
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-aws/src/main/java/org/apache/nifi/kafka/service/aws/AmazonMSKConnectionService.java:
##########
@@ -42,6 +73,56 @@ public class AmazonMSKConnectionService extends
Kafka3ConnectionService {
.defaultValue(SaslMechanism.AWS_MSK_IAM)
.build();
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_TOKEN_PROVIDER =
KafkaClientComponent.AWS_WEB_IDENTITY_TOKEN_PROVIDER;
+
+ private static final int MIN_SESSION_DURATION_SECONDS = 900;
+ private static final int MAX_SESSION_DURATION_SECONDS = 3600;
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_SESSION_TIME = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity Session Time")
+ .description("Session time in seconds for AWS STS
AssumeRoleWithWebIdentity (between 900 and 3600 seconds).")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .defaultValue(String.valueOf(MAX_SESSION_DURATION_SECONDS))
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_REGION = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity STS Region")
+ .description("Region identifier used for the AWS Security Token
Service when exchanging Web Identity tokens.")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor AWS_WEB_IDENTITY_STS_ENDPOINT = new
PropertyDescriptor.Builder()
+ .name("AWS Web Identity STS Endpoint Override")
+ .description("Optional endpoint override for the AWS Security
Token Service.")
+ .dependsOn(
+ KafkaClientComponent.AWS_ROLE_SOURCE,
+ AwsRoleSource.WEB_IDENTITY_TOKEN
+ )
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor
AWS_WEB_IDENTITY_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("AWS Web Identity SSL Context Service")
Review Comment:
```suggestion
public static final PropertyDescriptor
AWS_WEB_IDENTITY_SSL_CONTEXT_PROVIDER = new PropertyDescriptor.Builder()
.name("AWS Web Identity SSL Context Provider")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]