This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b8eb7e23c4 NIFI-15161 - Add support for Azure Federated Identity
Credentials
b8eb7e23c4 is described below
commit b8eb7e23c408f33f631a8dd92db4d934335e5d21
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Oct 30 21:58:20 2025 +0100
NIFI-15161 - Add support for Azure Federated Identity Credentials
This closes #10482.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi-azure-processors/pom.xml | 11 +-
.../azure/eventhub/ConsumeAzureEventHub.java | 95 ++++++---------
.../azure/eventhub/GetAzureEventHub.java | 12 +-
.../azure/eventhub/PutAzureEventHub.java | 12 +-
.../azure/eventhub/utils/AzureEventHubUtils.java | 98 ++++-----------
.../azure/storage/CopyAzureBlobStorage_v12.java | 11 +-
.../queue/AbstractAzureQueueStorage_v12.java | 11 +-
.../azure/storage/utils/AzureStorageUtils.java | 12 +-
.../storage/utils/BlobServiceClientFactory.java | 14 ++-
.../utils/DataLakeServiceClientFactory.java | 4 +
.../StandardAzureCredentialsControllerService.java | 30 ++++-
...andardAzureIdentityFederationTokenProvider.java | 135 +++++++++++++++++++++
.../storage/ADLSCredentialsControllerService.java | 9 ++
...ureStorageCredentialsControllerService_v12.java | 14 ++-
.../util/AzureWorkloadIdentityCredentialUtils.java | 130 ++++++++++++++++++++
.../AzureEventHubAuthenticationStrategy.java | 3 +-
.../azure/eventhubs/AzureEventHubComponent.java | 9 ++
.../BlobStorageAuthenticationStrategy.java | 3 +-
.../org.apache.nifi.controller.ControllerService | 1 +
.../additionalDetails.md | 55 +++++++++
.../additionalDetails.md | 8 +-
.../azure/eventhub/GetAzureEventHubTest.java | 23 ++++
.../azure/eventhub/PutAzureEventHubTest.java | 23 ++++
.../azure/eventhub/TestConsumeAzureEventHub.java | 48 +++++++-
.../azure/MockIdentityFederationTokenProvider.java | 34 ++++++
...tStandardAzureCredentialsControllerService.java | 40 +++++-
...andardAzureIdentityFederationTokenProvider.java | 94 ++++++++++++++
.../TestADLSCredentialsControllerService.java | 44 +++++++
...ureStorageCredentialsControllerService_v12.java | 48 ++++++++
.../TestAzureWorkloadIdentityCredentialUtils.java | 125 +++++++++++++++++++
.../AzureIdentityFederationTokenProvider.java | 36 ++++++
.../azure/storage/ADLSCredentialsDetails.java | 18 ++-
.../AzureStorageCredentialsDetails_v12.java | 37 ++++--
.../azure/storage/AzureStorageCredentialsType.java | 3 +-
34 files changed, 1079 insertions(+), 171 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 494434ca0a..f6dc41b1a6 100644
--- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -26,6 +26,12 @@
<artifactId>nifi-utils</artifactId>
</dependency>
+ <!-- OAuth2 Access Token Provider API for Web Identity (OIDC) support
-->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-service-utils</artifactId>
@@ -52,10 +58,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-oauth2-provider-api</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
@@ -69,7 +71,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-services-api</artifactId>
<version>2.9.0-SNAPSHOT</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 2510814761..beb83b797c 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -81,6 +81,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
@@ -151,6 +152,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER =
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
+ static final PropertyDescriptor
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER =
AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY_NAME = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Listen claims.")
@@ -268,6 +270,13 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.required(true)
.dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.OAUTH2)
.build();
+ static final PropertyDescriptor
BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("Storage Identity Federation Token Provider")
+ .description("Controller Service exchanging workload identity
tokens for Azure AD access tokens when using Identity Federation with Azure
Blob Storage.")
+
.identifiesControllerService(AzureIdentityFederationTokenProvider.class)
+ .required(true)
+ .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION)
+ .build();
static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new
PropertyDescriptor.Builder()
.name("Storage Account Key")
.description("The Azure Storage account key to store event hub
consumer group state.")
@@ -327,6 +336,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
+ EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
CONSUMER_GROUP,
RECORD_READER,
RECORD_WRITER,
@@ -341,6 +351,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
STORAGE_ACCOUNT_KEY,
STORAGE_SAS_TOKEN,
BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER,
+ BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER,
PROXY_CONFIGURATION_SERVICE
);
@@ -436,7 +447,6 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
final String storageAccountKey =
validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken =
validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
final CheckpointStrategy checkpointStrategy =
CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue());
- final boolean blobOauthProviderSet =
validationContext.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).isSet();
if ((recordReader != null && recordWriter == null) || (recordReader ==
null && recordWriter != null)) {
results.add(new ValidationResult.Builder()
@@ -449,10 +459,10 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
final BlobStorageAuthenticationStrategy
blobStorageAuthenticationStrategy =
-
validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY)
-
.asAllowableValue(BlobStorageAuthenticationStrategy.class);
+
validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);
if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) {
+ // needed because of expression language support
if (StringUtils.isBlank(storageAccountKey)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_ACCOUNT_KEY.getDisplayName())
@@ -463,18 +473,8 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.valid(false)
.build());
}
-
- if (StringUtils.isNotBlank(storageSasToken)) {
- results.add(new ValidationResult.Builder()
- .subject(STORAGE_SAS_TOKEN.getDisplayName())
- .explanation("%s must not be set when %s is %s."
-
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
- .valid(false)
- .build());
- }
} else if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE) {
+ // needed because of expression language support
if (StringUtils.isBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_SAS_TOKEN.getDisplayName())
@@ -485,53 +485,15 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.valid(false)
.build());
}
-
- if (StringUtils.isNotBlank(storageAccountKey)) {
- results.add(new ValidationResult.Builder()
- .subject(STORAGE_ACCOUNT_KEY.getDisplayName())
- .explanation("%s must not be set when %s is %s."
-
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
- .valid(false)
- .build());
- }
} else if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.OAUTH2) {
- if (!blobOauthProviderSet) {
- results.add(new ValidationResult.Builder()
-
.subject(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName())
- .explanation("%s must be set when %s is %s."
-
.formatted(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
- .valid(false)
- .build());
- }
-
- if (StringUtils.isNotBlank(storageAccountKey)) {
- results.add(new ValidationResult.Builder()
- .subject(STORAGE_ACCOUNT_KEY.getDisplayName())
- .explanation("%s must not be set when %s is %s."
-
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
- .valid(false)
- .build());
- }
-
- if (StringUtils.isNotBlank(storageSasToken)) {
- results.add(new ValidationResult.Builder()
- .subject(STORAGE_SAS_TOKEN.getDisplayName())
- .explanation("%s must not be set when %s is %s."
-
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
- .valid(false)
- .build());
- }
+ // Rely on required property + dependsOn validation to ensure
provider is configured
+ } else if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION) {
+ // Rely on required property + dependsOn validation to ensure
provider is configured
}
}
- results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, validationContext));
+
+ results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY, validationContext));
+
return results;
}
@@ -629,6 +591,14 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
blobContainerClientBuilder.endpoint(endpoint);
blobContainerClientBuilder.credential(tokenCredential);
}
+ case IDENTITY_FEDERATION -> {
+ final AzureIdentityFederationTokenProvider tokenProvider =
+
context.getProperty(BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER).asControllerService(AzureIdentityFederationTokenProvider.class);
+ final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
+ final String endpoint =
createBlobEndpoint(storageAccountName, domainName);
+ blobContainerClientBuilder.endpoint(endpoint);
+ blobContainerClientBuilder.credential(tokenCredential);
+ }
}
blobContainerClientBuilder.containerName(containerName);
@@ -684,6 +654,13 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName,
tokenCredential);
}
+ case IDENTITY_FEDERATION -> {
+ final AzureIdentityFederationTokenProvider tokenProvider =
+
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
+
.asControllerService(AzureIdentityFederationTokenProvider.class);
+ final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
+
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName,
tokenCredential);
+ }
}
final Integer prefetchCount =
context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
@@ -923,7 +900,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY,
storageAccountName, storageAccountKey, domainName);
case SHARED_ACCESS_SIGNATURE ->
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN,
storageAccountName, domainName, storageSasToken);
- case OAUTH2 -> throw new IllegalArgumentException(String.format(
+ case OAUTH2, IDENTITY_FEDERATION -> throw new
IllegalArgumentException(String.format(
"Blob Storage Authentication Strategy %s does not support
connection string authentication", blobStorageAuthenticationStrategy));
};
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 55b50805f1..1cfb1808a5 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -58,6 +58,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
@@ -117,6 +118,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER =
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
+ static final PropertyDescriptor
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER =
AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Listen claims.")
@@ -173,6 +175,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
+ EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
CONSUMER_GROUP,
ENQUEUE_TIME,
RECEIVER_FETCH_SIZE,
@@ -210,7 +213,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
+ return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, context);
}
@OnPrimaryNodeStateChange
@@ -437,6 +440,13 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, tokenCredential);
}
+ case IDENTITY_FEDERATION -> {
+ final AzureIdentityFederationTokenProvider tokenProvider =
+
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
+
.asControllerService(AzureIdentityFederationTokenProvider.class);
+ final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, tokenCredential);
+ }
}
// Set Azure Event Hub Client Identifier using Processor Identifier
instead of default random UUID
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index 2ec7d310d1..9f07241186 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -51,6 +51,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
@@ -90,6 +91,7 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER =
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
+ static final PropertyDescriptor
EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER =
AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Send claims.")
@@ -134,6 +136,7 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
+ EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
PARTITIONING_KEY_ATTRIBUTE_NAME,
MAX_BATCH_SIZE,
PROXY_CONFIGURATION_SERVICE
@@ -172,7 +175,7 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
+ return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, context);
}
@Override
@@ -261,6 +264,13 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, tokenCredential);
}
+ case IDENTITY_FEDERATION -> {
+ final AzureIdentityFederationTokenProvider tokenProvider =
+
context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER)
+
.asControllerService(AzureIdentityFederationTokenProvider.class);
+ final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, tokenCredential);
+ }
}
AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions);
return eventHubClientBuilder.buildProducerClient();
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
index 566d08b056..dc0f4365a1 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -25,9 +25,11 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.AccessToken;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
@@ -44,10 +46,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
public final class AzureEventHubUtils {
+ private static final long DEFAULT_TOKEN_EXPIRATION_SECONDS = 300;
+
public static final AllowableValue AZURE_ENDPOINT = new
AllowableValue(".servicebus.windows.net", "Azure", "Servicebus endpoint for
general use");
public static final AllowableValue AZURE_CHINA_ENDPOINT = new
AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus
endpoint for China");
public static final AllowableValue AZURE_GERMANY_ENDPOINT = new
AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint
for Germany");
@@ -76,44 +79,19 @@ public final class AzureEventHubUtils {
.required(true)
.build();
- private static final long DEFAULT_TOKEN_LIFETIME_SECONDS =
TimeUnit.MINUTES.toSeconds(5);
-
public static List<ValidationResult> customValidate(PropertyDescriptor
accessPolicyDescriptor,
PropertyDescriptor
policyKeyDescriptor,
- PropertyDescriptor
tokenProviderDescriptor,
ValidationContext
context) {
List<ValidationResult> validationResults = new ArrayList<>();
-
boolean accessPolicyIsSet =
context.getProperty(accessPolicyDescriptor).isSet();
boolean policyKeyIsSet =
context.getProperty(policyKeyDescriptor).isSet();
final AzureEventHubAuthenticationStrategy authenticationStrategy =
Optional.ofNullable(
-
context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY)
-
.asAllowableValue(AzureEventHubAuthenticationStrategy.class))
+
context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class))
.orElse(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY);
- final boolean tokenProviderIsSet = tokenProviderDescriptor != null &&
context.getProperty(tokenProviderDescriptor).isSet();
switch (authenticationStrategy) {
- case MANAGED_IDENTITY -> {
- if (accessPolicyIsSet || policyKeyIsSet) {
- final String msg = String.format(
- "When '%s' is set to '%s', '%s' and '%s' must not
be set.",
-
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName(),
- accessPolicyDescriptor.getDisplayName(),
- policyKeyDescriptor.getDisplayName()
- );
- validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
- }
- if (tokenProviderIsSet) {
- validationResults.add(new ValidationResult.Builder()
-
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
- .valid(false)
- .explanation(String.format("'%s' must not be set
when '%s' is '%s'.",
- tokenProviderDescriptor.getDisplayName(),
-
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName()))
- .build());
- }
+ case MANAGED_IDENTITY, OAUTH2, IDENTITY_FEDERATION -> {
+ // Rely on required property + dependsOn validation to ensure
proper configuration
}
case SHARED_ACCESS_SIGNATURE -> {
if (!accessPolicyIsSet || !policyKeyIsSet) {
@@ -126,40 +104,9 @@ public final class AzureEventHubUtils {
);
validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
}
- if (tokenProviderIsSet) {
- validationResults.add(new ValidationResult.Builder()
-
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
- .valid(false)
- .explanation(String.format("'%s' must not be set
when '%s' is '%s'.",
- tokenProviderDescriptor.getDisplayName(),
-
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
- .build());
- }
- }
- case OAUTH2 -> {
- if (accessPolicyIsSet || policyKeyIsSet) {
- final String msg = String.format(
- "When '%s' is set to '%s', '%s' and '%s' must not
be set.",
-
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-
AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName(),
- accessPolicyDescriptor.getDisplayName(),
- policyKeyDescriptor.getDisplayName()
- );
- validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
- }
- if (!tokenProviderIsSet) {
- validationResults.add(new ValidationResult.Builder()
-
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
- .valid(false)
- .explanation(String.format("'%s' must be set when
'%s' is '%s'.",
- tokenProviderDescriptor.getDisplayName(),
-
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-
AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName()))
- .build());
- }
}
}
+
ProxyConfiguration.validateProxySpec(context, validationResults,
AzureEventHubComponent.PROXY_SPECS);
return validationResults;
}
@@ -215,24 +162,25 @@ public final class AzureEventHubUtils {
Objects.requireNonNull(tokenProvider, "OAuth2 Access Token Provider is
required");
return tokenRequestContext -> Mono.fromSupplier(() -> {
- final org.apache.nifi.oauth2.AccessToken accessDetails =
tokenProvider.getAccessDetails();
- final String accessToken = accessDetails.getAccessToken();
-
- if (accessToken == null || accessToken.isBlank()) {
- throw new IllegalStateException("OAuth2 Access Token Provider
returned an empty access token");
+ final AccessToken accessToken = tokenProvider.getAccessDetails();
+ Objects.requireNonNull(accessToken, "Access Token is required");
+ final String tokenValue = accessToken.getAccessToken();
+ if (tokenValue == null || tokenValue.isEmpty()) {
+ throw new IllegalStateException("Access Token value is
required");
}
-
- final Instant fetchTime = accessDetails.getFetchTime();
- final long expiresInSeconds = accessDetails.getExpiresIn();
- final Instant expirationInstant = expiresInSeconds > 0
- ? fetchTime.plusSeconds(expiresInSeconds)
- : fetchTime.plusSeconds(DEFAULT_TOKEN_LIFETIME_SECONDS);
- final OffsetDateTime expiresAt =
OffsetDateTime.ofInstant(expirationInstant, ZoneOffset.UTC);
-
- return new com.azure.core.credential.AccessToken(accessToken,
expiresAt);
+ final Instant fetchTime =
Objects.requireNonNull(accessToken.getFetchTime(), "Access Token fetch time
required");
+ final long expiresIn = accessToken.getExpiresIn();
+ final Instant expirationInstant = expiresIn > 0 ?
fetchTime.plusSeconds(expiresIn) :
fetchTime.plusSeconds(DEFAULT_TOKEN_EXPIRATION_SECONDS);
+ final OffsetDateTime expirationTime =
OffsetDateTime.ofInstant(expirationInstant, ZoneOffset.UTC);
+ return new com.azure.core.credential.AccessToken(tokenValue,
expirationTime);
});
}
+ public static TokenCredential createTokenCredential(final
AzureIdentityFederationTokenProvider tokenProvider) {
+ Objects.requireNonNull(tokenProvider, "Identity Federation Token
Provider is required");
+ return tokenProvider.getCredentials();
+ }
+
private static Proxy getProxy(ProxyConfiguration proxyConfiguration) {
final Proxy.Type type;
if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java
index 24029168f6..d4e3e61a5e 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java
@@ -58,6 +58,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import
org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
@@ -73,6 +74,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -365,8 +367,13 @@ public class CopyAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 {
private static HttpAuthorization getHttpAuthorization(final
AzureStorageCredentialsDetails_v12 credentialsDetails) {
switch (credentialsDetails.getCredentialsType()) {
case ACCESS_TOKEN -> {
- TokenCredential credential = tokenRequestContext ->
Mono.just(credentialsDetails.getAccessToken());
- return getHttpAuthorizationFromTokenCredential(credential);
+ final TokenCredential accessTokenCredential =
tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken());
+ return
getHttpAuthorizationFromTokenCredential(accessTokenCredential);
+ }
+ case IDENTITY_FEDERATION -> {
+ final AzureIdentityFederationTokenProvider
identityTokenProvider = Objects.requireNonNull(
+ credentialsDetails.getIdentityTokenProvider(),
"Identity Federation Token Provider is required");
+ return
getHttpAuthorizationFromTokenCredential(identityTokenProvider.getCredentials());
}
case MANAGED_IDENTITY -> {
final ManagedIdentityCredential credential = new
ManagedIdentityCredentialBuilder()
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java
index 3318c8b817..9ab8544f97 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java
@@ -42,6 +42,7 @@ import org.apache.nifi.processors.azure.AzureServiceEndpoints;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
import reactor.core.publisher.Mono;
@@ -51,6 +52,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -177,8 +179,13 @@ public abstract class AbstractAzureQueueStorage_v12
extends AbstractProcessor {
.build());
break;
case ACCESS_TOKEN:
- TokenCredential credential = tokenRequestContext ->
Mono.just(storageCredentialsDetails.getAccessToken());
- clientBuilder.credential(credential);
+ final TokenCredential accessTokenCredential =
tokenRequestContext -> Mono.just(storageCredentialsDetails.getAccessToken());
+ clientBuilder.credential(accessTokenCredential);
+ break;
+ case IDENTITY_FEDERATION:
+ final AzureIdentityFederationTokenProvider
identityTokenProvider = Objects.requireNonNull(
+ storageCredentialsDetails.getIdentityTokenProvider(),
"Identity Federation Token Provider is required");
+
clientBuilder.credential(identityTokenProvider.getCredentials());
break;
}
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index be2d6f1fed..ecb547749f 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -30,6 +30,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.proxy.SocksVersion;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import
org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
@@ -87,7 +88,8 @@ public final class AzureStorageUtils {
AzureStorageCredentialsType.ACCOUNT_KEY,
AzureStorageCredentialsType.SAS_TOKEN,
AzureStorageCredentialsType.MANAGED_IDENTITY,
- AzureStorageCredentialsType.SERVICE_PRINCIPAL))
+ AzureStorageCredentialsType.SERVICE_PRINCIPAL,
+ AzureStorageCredentialsType.IDENTITY_FEDERATION))
.defaultValue(AzureStorageCredentialsType.SAS_TOKEN)
.build();
@@ -269,6 +271,14 @@ public final class AzureStorageUtils {
.dependsOn(CREDENTIALS_TYPE,
AzureStorageCredentialsType.SERVICE_PRINCIPAL)
.build();
+ public static final PropertyDescriptor IDENTITY_FEDERATION_TOKEN_PROVIDER
= new PropertyDescriptor.Builder()
+ .name("Identity Federation Token Provider")
+ .description("Controller Service that provides Azure credentials
via workload identity federation.")
+
.identifiesControllerService(AzureIdentityFederationTokenProvider.class)
+ .required(true)
+ .dependsOn(CREDENTIALS_TYPE,
AzureStorageCredentialsType.IDENTITY_FEDERATION)
+ .build();
+
private AzureStorageUtils() {
// do not instantiate
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
index fb811f6de2..0aceee924c 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
@@ -28,9 +28,12 @@ import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import reactor.core.publisher.Mono;
+import java.util.Objects;
+
public class BlobServiceClientFactory extends
AbstractStorageClientFactory<AzureStorageCredentialsDetails_v12,
BlobServiceClient> {
public BlobServiceClientFactory(final ComponentLog logger, final
ProxyOptions proxyOptions) {
@@ -73,12 +76,17 @@ public class BlobServiceClientFactory extends
AbstractStorageClientFactory<Azure
.clientSecret(credentialsDetails.getServicePrincipalClientSecret())
.httpClient(new NettyAsyncHttpClientBuilder()
.proxy(credentialsDetails.getProxyOptions())
- .build())
+ .build())
.build());
break;
case ACCESS_TOKEN:
- TokenCredential credential = tokenRequestContext ->
Mono.just(credentialsDetails.getAccessToken());
- clientBuilder.credential(credential);
+ final TokenCredential accessTokenCredential =
tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken());
+ clientBuilder.credential(accessTokenCredential);
+ break;
+ case IDENTITY_FEDERATION:
+ final AzureIdentityFederationTokenProvider
identityTokenProvider = Objects.requireNonNull(
+ credentialsDetails.getIdentityTokenProvider(),
"Identity Federation Token Provider is required");
+
clientBuilder.credential(identityTokenProvider.getCredentials());
break;
}
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
index bebf1cebd9..b1c7cd3152 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
@@ -31,6 +31,7 @@ import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import reactor.core.publisher.Mono;
@@ -46,6 +47,7 @@ public class DataLakeServiceClientFactory extends
AbstractStorageClientFactory<A
final String accountKey = credentialsDetails.getAccountKey();
final String sasToken = credentialsDetails.getSasToken();
final AccessToken accessToken = credentialsDetails.getAccessToken();
+ final AzureIdentityFederationTokenProvider identityTokenProvider =
credentialsDetails.getIdentityTokenProvider();
final String endpointSuffix = credentialsDetails.getEndpointSuffix();
final boolean useManagedIdentity =
credentialsDetails.getUseManagedIdentity();
final String managedIdentityClientId =
credentialsDetails.getManagedIdentityClientId();
@@ -64,6 +66,8 @@ public class DataLakeServiceClientFactory extends
AbstractStorageClientFactory<A
dataLakeServiceClientBuilder.credential(credential);
} else if (StringUtils.isNotBlank(sasToken)) {
dataLakeServiceClientBuilder.sasToken(sasToken);
+ } else if (identityTokenProvider != null) {
+
dataLakeServiceClientBuilder.credential(identityTokenProvider.getCredentials());
} else if (accessToken != null) {
final TokenCredential credential = tokenRequestContext ->
Mono.just(accessToken);
dataLakeServiceClientBuilder.credential(credential);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java
index fb648155c9..32f02b8ea2 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java
@@ -44,21 +44,24 @@ import java.util.List;
@Tags({"azure", "security", "credentials", "provider", "session"})
@CapabilityDescription("Provide credentials to use with an Azure client.")
public class StandardAzureCredentialsControllerService extends
AbstractControllerService implements AzureCredentialsService {
- public static AllowableValue DEFAULT_CREDENTIAL = new
AllowableValue("default-credential",
+ public static final AllowableValue DEFAULT_CREDENTIAL = new
AllowableValue("default-credential",
"Default Credential",
"Uses default credential chain. It first checks environment
variables, before trying managed identity.");
- public static AllowableValue MANAGED_IDENTITY = new
AllowableValue("managed-identity",
+ public static final AllowableValue MANAGED_IDENTITY = new
AllowableValue("managed-identity",
"Managed Identity",
"Azure Virtual Machine Managed Identity (it can only be used when
NiFi is running on Azure)");
- public static AllowableValue SERVICE_PRINCIPAL = new
AllowableValue("service-principal",
+ public static final AllowableValue SERVICE_PRINCIPAL = new
AllowableValue("service-principal",
"Service Principal",
"Azure Active Directory Service Principal with Client ID / Client
Secret of a registered application");
+ public static final AllowableValue IDENTITY_FEDERATION = new
AllowableValue("identity-federation",
+ "Identity Federation",
+ "Uses workload identity federation to obtain access tokens for
Azure clients via an external identity token.");
public static final PropertyDescriptor CREDENTIAL_CONFIGURATION_STRATEGY =
new PropertyDescriptor.Builder()
.name("Credential Configuration Strategy")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.sensitive(false)
- .allowableValues(DEFAULT_CREDENTIAL, MANAGED_IDENTITY,
SERVICE_PRINCIPAL)
+ .allowableValues(DEFAULT_CREDENTIAL, MANAGED_IDENTITY,
SERVICE_PRINCIPAL, IDENTITY_FEDERATION)
.defaultValue(DEFAULT_CREDENTIAL)
.build();
@@ -103,12 +106,21 @@ public class StandardAzureCredentialsControllerService
extends AbstractControlle
.dependsOn(CREDENTIAL_CONFIGURATION_STRATEGY, SERVICE_PRINCIPAL)
.build();
+ public static final PropertyDescriptor IDENTITY_FEDERATION_TOKEN_PROVIDER
= new PropertyDescriptor.Builder()
+ .name("Identity Federation Token Provider")
+ .description("Controller Service that provides Azure credentials
via workload identity federation.")
+
.identifiesControllerService(AzureIdentityFederationTokenProvider.class)
+ .required(true)
+ .dependsOn(CREDENTIAL_CONFIGURATION_STRATEGY, IDENTITY_FEDERATION)
+ .build();
+
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
CREDENTIAL_CONFIGURATION_STRATEGY,
MANAGED_IDENTITY_CLIENT_ID,
SERVICE_PRINCIPAL_TENANT_ID,
SERVICE_PRINCIPAL_CLIENT_ID,
- SERVICE_PRINCIPAL_CLIENT_SECRET
+ SERVICE_PRINCIPAL_CLIENT_SECRET,
+ IDENTITY_FEDERATION_TOKEN_PROVIDER
);
private TokenCredential credentials;
@@ -133,6 +145,8 @@ public class StandardAzureCredentialsControllerService
extends AbstractControlle
credentials = getManagedIdentityCredential(context);
} else if (SERVICE_PRINCIPAL.getValue().equals(configurationStrategy))
{
credentials = getServicePrincipalCredential(context);
+ } else if
(IDENTITY_FEDERATION.getValue().equals(configurationStrategy)) {
+ credentials = getIdentityFederationCredential(context);
} else {
final String errorMsg = String.format("Configuration Strategy [%s]
not recognized", configurationStrategy);
getLogger().error(errorMsg);
@@ -178,6 +192,12 @@ public class StandardAzureCredentialsControllerService
extends AbstractControlle
.build();
}
+ private TokenCredential getIdentityFederationCredential(final
ConfigurationContext context) {
+ final AzureIdentityFederationTokenProvider
identityFederationTokenProvider =
context.getProperty(IDENTITY_FEDERATION_TOKEN_PROVIDER)
+
.asControllerService(AzureIdentityFederationTokenProvider.class);
+ return identityFederationTokenProvider.getCredentials();
+ }
+
@Override
public String toString() {
return "StandardAzureCredentialsControllerService[id=" +
getIdentifier() + "]";
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java
new file mode 100644
index 0000000000..29e6fa4ce6
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.services.azure.util.AzureWorkloadIdentityCredentialUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Controller Service that provides Azure {@link TokenCredential} for workload
identity federation.
+ * Uses {@link AzureWorkloadIdentityCredentialUtils} to build the credential
from an external identity token.
+ */
+@Tags({"azure", "identity", "federation", "credentials", "workload"})
+@CapabilityDescription("Provides Azure TokenCredential for workload identity
federation. "
+ + "Exchanges external identity tokens (from an OAuth2 provider) for
Azure AD access tokens "
+ + "using Azure Identity SDK's ClientAssertionCredential with built-in
caching and retry logic.")
+public class StandardAzureIdentityFederationTokenProvider extends
AbstractControllerService
+ implements AzureIdentityFederationTokenProvider,
VerifiableControllerService {
+
+ private static final String DEFAULT_VERIFICATION_SCOPE =
"https://storage.azure.com/.default";
+ private static final String ERROR_EXCHANGE_FAILED = "Failed to exchange
workload identity token: %s";
+ private static final String STEP_EXCHANGE_TOKEN = "Exchange workload
identity token";
+
+ public static final PropertyDescriptor TENANT_ID = new
PropertyDescriptor.Builder()
+ .name("Tenant ID")
+ .description("Microsoft Entra tenant ID.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CLIENT_ID = new
PropertyDescriptor.Builder()
+ .name("Client ID")
+ .description("Application (client) ID of the Microsoft Entra
application registration configured for workload identity federation.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CLIENT_ASSERTION_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("Client Assertion Provider")
+ .description("Controller Service that retrieves the external
workload identity token (client assertion) exchanged with Azure AD.")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .required(true)
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
+ TENANT_ID,
+ CLIENT_ID,
+ CLIENT_ASSERTION_PROVIDER
+ );
+
+ private volatile TokenCredential credential;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final String tenantId = context.getProperty(TENANT_ID).getValue();
+ final String clientId = context.getProperty(CLIENT_ID).getValue();
+ final OAuth2AccessTokenProvider clientAssertionProvider =
context.getProperty(CLIENT_ASSERTION_PROVIDER)
+ .asControllerService(OAuth2AccessTokenProvider.class);
+
+ this.credential =
AzureWorkloadIdentityCredentialUtils.createCredential(tenantId, clientId,
clientAssertionProvider);
+ }
+
+ @Override
+ public TokenCredential getCredentials() {
+ return credential;
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ConfigurationContext
context,
+ final ComponentLog
verificationLogger,
+ final Map<String, String>
variables) {
+ final ConfigVerificationResult.Builder resultBuilder = new
ConfigVerificationResult.Builder()
+ .verificationStepName(STEP_EXCHANGE_TOKEN);
+
+ try {
+ final String tenantId = context.getProperty(TENANT_ID).getValue();
+ final String clientId = context.getProperty(CLIENT_ID).getValue();
+ final OAuth2AccessTokenProvider verificationAssertionProvider =
context.getProperty(CLIENT_ASSERTION_PROVIDER)
+ .asControllerService(OAuth2AccessTokenProvider.class);
+
+ final TokenCredential verificationCredential =
AzureWorkloadIdentityCredentialUtils.createCredential(
+ tenantId, clientId, verificationAssertionProvider);
+
+ final TokenRequestContext tokenRequestContext = new
TokenRequestContext().addScopes(DEFAULT_VERIFICATION_SCOPE);
+ verificationCredential.getToken(tokenRequestContext).block();
+
+ return Collections.singletonList(resultBuilder
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation("Successfully exchanged workload identity
token for an Azure AD access token")
+ .build());
+ } catch (final Exception e) {
+ final String explanation = String.format(ERROR_EXCHANGE_FAILED,
e.getMessage());
+ verificationLogger.error(explanation, e);
+ return Collections.singletonList(resultBuilder
+ .outcome(Outcome.FAILED)
+ .explanation(explanation)
+ .build());
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java
index 6a223fcaae..2f93e87fdd 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java
@@ -28,6 +28,7 @@ import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processors.azure.AzureServiceEndpoints;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import java.util.List;
import java.util.Map;
@@ -88,6 +89,7 @@ public class ADLSCredentialsControllerService extends
AbstractControllerService
SERVICE_PRINCIPAL_TENANT_ID,
SERVICE_PRINCIPAL_CLIENT_ID,
SERVICE_PRINCIPAL_CLIENT_SECRET,
+ AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER,
PROXY_CONFIGURATION_SERVICE
);
@@ -129,6 +131,7 @@ public class ADLSCredentialsControllerService extends
AbstractControllerService
config.removeProperty(propNameUseManagedIdentity);
}
+
ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
}
@@ -152,6 +155,12 @@ public class ADLSCredentialsControllerService extends
AbstractControllerService
setValue(credentialsBuilder, SERVICE_PRINCIPAL_CLIENT_ID,
PropertyValue::getValue,
ADLSCredentialsDetails.Builder::setServicePrincipalClientId, attributes);
setValue(credentialsBuilder, SERVICE_PRINCIPAL_CLIENT_SECRET,
PropertyValue::getValue,
ADLSCredentialsDetails.Builder::setServicePrincipalClientSecret, attributes);
+ if
(context.getProperty(CREDENTIALS_TYPE).asAllowableValue(AzureStorageCredentialsType.class)
== AzureStorageCredentialsType.IDENTITY_FEDERATION) {
+ final AzureIdentityFederationTokenProvider identityTokenProvider =
context.getProperty(AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER)
+
.asControllerService(AzureIdentityFederationTokenProvider.class);
+ credentialsBuilder.setIdentityTokenProvider(identityTokenProvider);
+ }
+
credentialsBuilder.setProxyOptions(AzureStorageUtils.getProxyOptions(context));
return credentialsBuilder.build();
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
index 9c761b0e97..62a34fd4a0 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
@@ -27,6 +27,7 @@ import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processors.azure.AzureServiceEndpoints;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import java.util.List;
import java.util.Map;
@@ -34,6 +35,7 @@ import java.util.Map;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ACCOUNT_KEY;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ACCOUNT_NAME;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.CREDENTIALS_TYPE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.MANAGED_IDENTITY_CLIENT_ID;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SAS_TOKEN;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_ID;
@@ -69,6 +71,7 @@ public class AzureStorageCredentialsControllerService_v12
extends AbstractContro
SERVICE_PRINCIPAL_TENANT_ID,
SERVICE_PRINCIPAL_CLIENT_ID,
SERVICE_PRINCIPAL_CLIENT_SECRET,
+ IDENTITY_FEDERATION_TOKEN_PROVIDER,
PROXY_CONFIGURATION_SERVICE
);
@@ -107,9 +110,16 @@ public class AzureStorageCredentialsControllerService_v12
extends AbstractContro
String servicePrincipalClientSecret =
context.getProperty(SERVICE_PRINCIPAL_CLIENT_SECRET).getValue();
return
AzureStorageCredentialsDetails_v12.createWithServicePrincipal(accountName,
endpointSuffix,
servicePrincipalTenantId, servicePrincipalClientId,
servicePrincipalClientSecret, proxyOptions);
- default:
- throw new IllegalArgumentException("Unhandled credentials
type: " + credentialsType);
+ case IDENTITY_FEDERATION:
+ final AzureIdentityFederationTokenProvider
identityTokenProvider = context.getProperty(IDENTITY_FEDERATION_TOKEN_PROVIDER)
+
.asControllerService(AzureIdentityFederationTokenProvider.class);
+ return
AzureStorageCredentialsDetails_v12.createWithIdentityTokenProvider(
+ accountName,
+ endpointSuffix,
+ identityTokenProvider);
}
+
+ throw new IllegalArgumentException("Unhandled credentials type: " +
credentialsType);
}
@Override
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/util/AzureWorkloadIdentityCredentialUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/util/AzureWorkloadIdentityCredentialUtils.java
new file mode 100644
index 0000000000..bbb51aee34
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/util/AzureWorkloadIdentityCredentialUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.util;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.identity.ClientAssertionCredentialBuilder;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Utility class for building Azure {@link TokenCredential} objects for
workload identity federation.
+ * <p>
+ * This utility simplifies the creation of {@link
com.azure.identity.ClientAssertionCredential} by providing
+ * factory methods that accept common NiFi components like {@link
OAuth2AccessTokenProvider}.
+ */
+public final class AzureWorkloadIdentityCredentialUtils {
+
+ private AzureWorkloadIdentityCredentialUtils() {
+ // Utility class - prevent instantiation
+ }
+
+ /**
+ * Creates a {@link TokenCredential} for Azure workload identity
federation using the provided
+ * OAuth2 access token provider as the source of client assertions.
+ *
+ * @param tenantId the Microsoft Entra tenant ID
+ * @param clientId the application (client) ID of the
Microsoft Entra application
+ * @param clientAssertionProvider the OAuth2 access token provider that
supplies the client assertion token
+ * @return a TokenCredential configured for workload identity federation
+ * @throws NullPointerException if any parameter is null
+ */
+ public static TokenCredential createCredential(
+ final String tenantId,
+ final String clientId,
+ final OAuth2AccessTokenProvider clientAssertionProvider) {
+ Objects.requireNonNull(tenantId, "Tenant ID is required");
+ Objects.requireNonNull(clientId, "Client ID is required");
+ Objects.requireNonNull(clientAssertionProvider, "Client Assertion
Provider is required");
+
+ return createCredential(tenantId, clientId, () ->
getClientAssertion(clientAssertionProvider));
+ }
+
+ /**
+ * Creates a {@link TokenCredential} for Azure workload identity
federation using a custom
+ * client assertion supplier.
+ *
+ * @param tenantId the Microsoft Entra tenant ID
+ * @param clientId the application (client) ID of the
Microsoft Entra application
+ * @param clientAssertionSupplier a supplier that provides the client
assertion token string
+ * @return a TokenCredential configured for workload identity federation
+ * @throws NullPointerException if any parameter is null
+ */
+ public static TokenCredential createCredential(
+ final String tenantId,
+ final String clientId,
+ final Supplier<String> clientAssertionSupplier) {
+ Objects.requireNonNull(tenantId, "Tenant ID is required");
+ Objects.requireNonNull(clientId, "Client ID is required");
+ Objects.requireNonNull(clientAssertionSupplier, "Client Assertion
Supplier is required");
+
+ return createCredential(tenantId, clientId, clientAssertionSupplier,
new NettyAsyncHttpClientBuilder().build());
+ }
+
+ /**
+ * Creates a {@link TokenCredential} for Azure workload identity
federation using a custom
+ * client assertion supplier and HTTP client.
+ *
+ * @param tenantId the Microsoft Entra tenant ID
+ * @param clientId the application (client) ID of the
Microsoft Entra application
+ * @param clientAssertionSupplier a supplier that provides the client
assertion token string
+ * @param httpClient the HTTP client to use for token requests
+ * @return a TokenCredential configured for workload identity federation
+ * @throws NullPointerException if any parameter is null
+ */
+ public static TokenCredential createCredential(
+ final String tenantId,
+ final String clientId,
+ final Supplier<String> clientAssertionSupplier,
+ final HttpClient httpClient) {
+ Objects.requireNonNull(tenantId, "Tenant ID is required");
+ Objects.requireNonNull(clientId, "Client ID is required");
+ Objects.requireNonNull(clientAssertionSupplier, "Client Assertion
Supplier is required");
+ Objects.requireNonNull(httpClient, "HTTP Client is required");
+
+ return new ClientAssertionCredentialBuilder()
+ .tenantId(tenantId)
+ .clientId(clientId)
+ .clientAssertion(clientAssertionSupplier)
+ .httpClient(httpClient)
+ .build();
+ }
+
+ /**
+ * Extracts the client assertion token from an OAuth2 access token
provider.
+ *
+ * @param provider the OAuth2 access token provider
+ * @return the client assertion token string
+ * @throws IllegalStateException if the provider returns null or an empty
token
+ */
+ private static String getClientAssertion(final OAuth2AccessTokenProvider
provider) {
+ final AccessToken accessToken = provider.getAccessDetails();
+ if (accessToken == null) {
+ throw new IllegalStateException("Client assertion provider
returned null");
+ }
+ final String assertion = accessToken.getAccessToken();
+ if (assertion == null || assertion.isBlank()) {
+ throw new IllegalStateException("Client assertion provider
returned empty token");
+ }
+ return assertion;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
index 88398e82ec..efcd1cf1ef 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
@@ -24,7 +24,8 @@ import org.apache.nifi.components.DescribedValue;
public enum AzureEventHubAuthenticationStrategy implements DescribedValue {
MANAGED_IDENTITY("Managed Identity", "Authenticate using the Managed
Identity of the hosting Azure resource."),
SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate using the
Shared Access Policy name and key."),
- OAUTH2("OAuth2", "Authenticate using an OAuth2 Access Token Provider
backed by an Entra registered application.");
+ OAUTH2("OAuth2", "Authenticate using an OAuth2 Access Token Provider
backed by an Entra registered application."),
+ IDENTITY_FEDERATION("Identity Federation", "Authenticate using a workload
identity token exchanged for an Azure AD access token.");
private final String displayName;
private final String description;
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
index caf503f024..28f738725d 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
@@ -22,6 +22,7 @@ import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
/**
* Azure Event Hub Component interface with shared properties
@@ -52,6 +53,14 @@ public interface AzureEventHubComponent {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.dependsOn(AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.OAUTH2)
.build();
+ PropertyDescriptor IDENTITY_FEDERATION_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("Event Hubs Identity Federation Token Provider")
+ .description("Controller Service exchanging workload identity
tokens for Azure AD access tokens when authenticating to Azure Event Hubs.")
+
.identifiesControllerService(AzureIdentityFederationTokenProvider.class)
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION)
+ .build();
ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.HTTP_AUTH};
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new
PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(PROXY_SPECS))
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
index 3ab03cc3f6..121f66339a 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
@@ -21,7 +21,8 @@ import org.apache.nifi.components.DescribedValue;
public enum BlobStorageAuthenticationStrategy implements DescribedValue {
STORAGE_ACCOUNT_KEY("Storage Account Key", "Authenticate to Azure Blob
Storage using the account key."),
SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate to Azure
Blob Storage using a SAS token."),
- OAUTH2("OAuth2", "Authenticate to Azure Blob Storage using an OAuth2
Access Token Provider backed by an Entra registered application.");
+ OAUTH2("OAuth2", "Authenticate to Azure Blob Storage using an OAuth2
Access Token Provider backed by an Entra registered application."),
+ IDENTITY_FEDERATION("Identity Federation", "Authenticate to Azure Blob
Storage using a workload identity token exchanged for an Azure AD access
token.");
private final String displayName;
private final String description;
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 2823c4f3fc..e76b973a78 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -21,5 +21,6 @@
org.apache.nifi.services.azure.data.explorer.StandardKustoIngestService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12
org.apache.nifi.services.azure.StandardAzureCredentialsControllerService
+org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider
org.apache.nifi.services.azure.storage.AzureBlobStorageFileResourceService
org.apache.nifi.services.azure.storage.AzureDataLakeStorageFileResourceService
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md
new file mode 100644
index 0000000000..33566d89eb
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md
@@ -0,0 +1,55 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# StandardAzureIdentityFederationTokenProvider
+
+The *StandardAzureIdentityFederationTokenProvider* provides Azure
`TokenCredential` for workload identity federation. It exchanges tokens from
external identity providers for Azure AD credentials using Azure Identity SDK's
`ClientAssertionCredential`. This approach provides built-in token caching,
automatic refresh, and robust error handling.
+
+Components such as the ADLS and Azure Storage credentials controller services
reference this provider when the **Credentials Type** is set to **Identity
Federation**.
+
+
+## Configuration workflow
+
+1. **Client Assertion Provider** – Select a controller service that retrieves
the external workload identity token. The token is passed to Azure AD as the
`client_assertion` parameter.
+2. **Tenant ID** and **Client ID** – Provide the Microsoft Entra tenant and
application (client) ID for the federated app registration.
+
+At runtime the service provides a `TokenCredential` backed by
`ClientAssertionCredential`. When the consuming component requests a token
(specifying the appropriate scope), the credential exchanges the client
assertion for an Azure AD access token via
`https://login.microsoftonline.com/<tenant>/oauth2/v2.0/token`. The Azure
Identity SDK handles token caching and automatic refresh when tokens expire.
+
+Ensure the federated app registration has the necessary Azure RBAC roles (for
example *Storage Blob Data Contributor* and *Azure Event Hubs Data
Receiver/Sender* as appropriate) and that the client assertion provider
refreshes assertions before they expire so new Azure access tokens can be
obtained.
+
+
+## Azure Resource Scopes
+
+Different Azure services require different scopes when requesting tokens. The
scope is determined automatically by the consuming component based on the Azure
service being accessed:
+
+- `https://storage.azure.com/.default` – Azure Storage operations (Blob, ADLS,
Queue).
+- `https://eventhubs.azure.net/.default` – Event Hubs operations.
+- `https://management.azure.com/.default` – Azure Resource Manager APIs.
+
+> **Note**: Microsoft Entra requires a single resource (`*.default`) per
client credentials request.
+
+
+## Event Hub components
+
+- `GetAzureEventHub`, `PutAzureEventHub`, and `ConsumeAzureEventHub` support
the **Identity Federation** authentication strategy for Event Hubs connections.
+- `ConsumeAzureEventHub` also supports Identity Federation for the Blob
Storage checkpoint store.
+
+
+## Entra ID setup summary
+
+1. **Create or reuse an app registration** for NiFi in Microsoft Entra ID.
+2. **Add a federated credential** (Certificates & secrets → Federated
credentials) matching your issuer/subject. Set the audience to
`api://AzureADTokenExchange`.
+3. **Assign RBAC roles** to that app registration, such as `Storage Blob Data
Reader`/`Storage Blob Data Contributor` on the storage account.
+4. Record the **Tenant ID** and **Client ID** for configuring the controller
service in NiFi.
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md
index d67c55ed98..0217b975ee 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md
@@ -15,6 +15,12 @@
# ADLSCredentialsControllerService
+### Azure Identity Federation Token Provider
+
+When the **Credentials Type** property is set to `Identity Federation`,
configure the **Azure Identity Federation Token Provider** with a controller
service capable of exchanging workload identity tokens for Azure AD access
tokens. The provider must return an `access_token` issued by Microsoft Entra ID
(for example using the `StandardAzureIdentityFederationTokenProvider`). The
access token is converted to the Azure SDK representation and cached in memory
until it expires.
+
+The Azure client instances created by this service do not perform additional
token refresh on their own. Ensure the configured Azure Identity Federation
Token Provider automatically refreshes tokens before they expire, and that the
configured scopes or audiences grant access to the target storage resources.
+
### Security considerations of using Expression Language for sensitive
properties
Allowing Expression Language for a property has the advantage of configuring
the property dynamically via FlowFile
@@ -30,4 +36,4 @@ Best practices for using Expression Language for sensitive
properties:
* control access to the flow and to provenance repository
* encrypt disks storing FlowFiles and provenance data
* if the sensitive data is a temporary token (like the SAS token), use a
shorter lifetime and refresh the token
- periodically
\ No newline at end of file
+ periodically
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index d3a30188a2..a6ed0e0239 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -31,6 +31,7 @@ import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.services.azure.MockIdentityFederationTokenProvider;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.MockFlowFile;
@@ -66,6 +67,7 @@ public class GetAzureEventHubTest {
private static final String POLICY_KEY = "POLICY-KEY";
private static final String CONSUMER_GROUP = "$Default";
private static final String EVENT_HUB_OAUTH_SERVICE_ID =
"get-event-hub-oauth";
+ private static final String EVENT_HUB_IDENTITY_SERVICE_ID =
"get-event-hub-identity";
private static final Instant ENQUEUED_TIME = Instant.now();
private static final long SEQUENCE_NUMBER = 32;
private static final String OFFSET = "64";
@@ -170,6 +172,13 @@ public class GetAzureEventHubTest {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_OAUTH_SERVICE_ID);
}
+ private void configureEventHubIdentityTokenProvider() throws
InitializationException {
+ final MockIdentityFederationTokenProvider provider = new
MockIdentityFederationTokenProvider();
+ testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID,
provider);
+ testRunner.enableControllerService(provider);
+
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_SERVICE_ID);
+ }
+
@Test
public void testPropertiesManagedIdentity() {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
@@ -193,6 +202,19 @@ public class GetAzureEventHubTest {
testRunner.assertValid();
}
+ @Test
+ public void testEventHubIdentityFederationRequiresTokenProvider() throws
InitializationException {
+ testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
+ testRunner.setProperty(GetAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(GetAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue());
+
+ testRunner.assertNotValid();
+
+ configureEventHubIdentityTokenProvider();
+
+ testRunner.assertValid();
+ }
+
@Test
public void testRunNoEventsReceived() {
setProperties();
@@ -321,4 +343,5 @@ public class GetAzureEventHubTest {
return accessToken;
}
}
+
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 5dffa9350f..d189e419e7 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -27,6 +27,7 @@ import
org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.azure.MockIdentityFederationTokenProvider;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.MockPropertyConfiguration;
@@ -66,6 +67,7 @@ public class PutAzureEventHubTest {
private static final String PARTITION_KEY = "partition";
private static final String CONTENT = String.class.getSimpleName();
private static final String EVENT_HUB_OAUTH_SERVICE_ID =
"put-event-hub-oauth";
+ private static final String EVENT_HUB_IDENTITY_SERVICE_ID =
"put-event-hub-identity";
@Mock
EventHubProducerClient eventHubProducerClient;
@@ -161,6 +163,13 @@ public class PutAzureEventHubTest {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_OAUTH_SERVICE_ID);
}
+ private void configureEventHubIdentityTokenProvider() throws
InitializationException {
+ final MockIdentityFederationTokenProvider provider = new
MockIdentityFederationTokenProvider();
+ testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID,
provider);
+ testRunner.enableControllerService(provider);
+
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_SERVICE_ID);
+ }
+
@Test
public void testPropertiesManagedIdentityEnabled() {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
@@ -184,6 +193,19 @@ public class PutAzureEventHubTest {
testRunner.assertValid();
}
+ @Test
+ public void testEventHubIdentityFederationRequiresTokenProvider() throws
InitializationException {
+ testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
+ testRunner.setProperty(PutAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(PutAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue());
+
+ testRunner.assertNotValid();
+
+ configureEventHubIdentityTokenProvider();
+
+ testRunner.assertValid();
+ }
+
@Test
public void testRunNoFlowFiles() {
setProperties();
@@ -279,4 +301,5 @@ public class PutAzureEventHubTest {
return accessToken;
}
}
+
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 50a9c8cdf1..b1c2d10f32 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -48,6 +48,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.services.azure.MockIdentityFederationTokenProvider;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import
org.apache.nifi.shared.azure.eventhubs.BlobStorageAuthenticationStrategy;
@@ -106,7 +107,9 @@ public class TestConsumeAzureEventHub {
private static final String APPLICATION_PROPERTY = "application";
private static final String APPLICATION_ATTRIBUTE_NAME =
String.format("eventhub.property.%s", APPLICATION_PROPERTY);
private static final String EVENT_HUB_OAUTH_SERVICE_ID = "eventHubOauth";
+ private static final String EVENT_HUB_IDENTITY_SERVICE_ID =
"eventHubIdentity";
private static final String BLOB_OAUTH_SERVICE_ID = "blobOauth";
+ private static final String BLOB_IDENTITY_SERVICE_ID = "blobIdentity";
private static final String EXPECTED_TRANSIT_URI =
String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s",
EVENT_HUB_NAMESPACE,
@@ -202,7 +205,7 @@ public class TestConsumeAzureEventHub {
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME,
STORAGE_ACCOUNT_NAME);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY,
STORAGE_ACCOUNT_KEY);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN,
STORAGE_TOKEN);
- testRunner.assertNotValid();
+ testRunner.assertValid();
}
@Test
@@ -218,6 +221,20 @@ public class TestConsumeAzureEventHub {
testRunner.assertValid();
}
+ @Test
+ public void
testProcessorConfigValidityWithEventHubIdentityFederationRequiresTokenProvider()
throws InitializationException {
+ testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
+ testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(ConsumeAzureEventHub.CHECKPOINT_STRATEGY,
CheckpointStrategy.COMPONENT_STATE.getValue());
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue());
+
+ testRunner.assertNotValid();
+
+ configureEventHubIdentityTokenProvider();
+
+ testRunner.assertValid();
+ }
+
@Test
public void
testProcessorConfigValidityWithBlobOAuthRequiresTokenProvider() throws
InitializationException {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
@@ -231,6 +248,20 @@ public class TestConsumeAzureEventHub {
testRunner.assertValid();
}
+ @Test
+ public void
testProcessorConfigValidityWithBlobIdentityFederationRequiresTokenProvider()
throws InitializationException {
+ testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
+ testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME,
STORAGE_ACCOUNT_NAME);
+
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION.getValue());
+
+ testRunner.assertNotValid();
+
+ configureBlobIdentityTokenProvider();
+
+ testRunner.assertValid();
+ }
+
@Test
public void testProcessorConfigValidityWithTokenSet() throws
InitializationException {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
@@ -668,6 +699,13 @@ public class TestConsumeAzureEventHub {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_OAUTH_SERVICE_ID);
}
+ private void configureEventHubIdentityTokenProvider() throws
InitializationException {
+ final MockIdentityFederationTokenProvider provider = new
MockIdentityFederationTokenProvider();
+ testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID,
provider);
+ testRunner.enableControllerService(provider);
+
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER,
EVENT_HUB_IDENTITY_SERVICE_ID);
+ }
+
private void configureBlobOAuthTokenProvider() throws
InitializationException {
final MockOAuth2AccessTokenProvider provider = new
MockOAuth2AccessTokenProvider();
testRunner.addControllerService(BLOB_OAUTH_SERVICE_ID, provider);
@@ -675,6 +713,13 @@ public class TestConsumeAzureEventHub {
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER,
BLOB_OAUTH_SERVICE_ID);
}
+ private void configureBlobIdentityTokenProvider() throws
InitializationException {
+ final MockIdentityFederationTokenProvider provider = new
MockIdentityFederationTokenProvider();
+ testRunner.addControllerService(BLOB_IDENTITY_SERVICE_ID, provider);
+ testRunner.enableControllerService(provider);
+
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER,
BLOB_IDENTITY_SERVICE_ID);
+ }
+
private class MockConsumeAzureEventHub extends ConsumeAzureEventHub {
@Override
protected EventProcessorClient createClient(final ProcessContext
context) {
@@ -696,4 +741,5 @@ public class TestConsumeAzureEventHub {
return accessToken;
}
}
+
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/MockIdentityFederationTokenProvider.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/MockIdentityFederationTokenProvider.java
new file mode 100644
index 0000000000..8c0f47bb3f
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/MockIdentityFederationTokenProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import org.apache.nifi.controller.AbstractControllerService;
+import reactor.core.publisher.Mono;
+
+import java.time.OffsetDateTime;
+
+public class MockIdentityFederationTokenProvider extends
AbstractControllerService implements AzureIdentityFederationTokenProvider {
+
+ public static final String ACCESS_TOKEN_VALUE = "access-token";
+
+ @Override
+ public TokenCredential getCredentials() {
+ return tokenRequestContext -> Mono.just(new
AccessToken(ACCESS_TOKEN_VALUE, OffsetDateTime.now().plusHours(1)));
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java
index a899684465..b51d662233 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.services.azure;
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
@@ -23,17 +26,27 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
public class TestStandardAzureCredentialsControllerService {
private static final String CREDENTIALS_SERVICE_IDENTIFIER =
"credentials-service";
private static final String SAMPLE_MANAGED_CLIENT_ID =
"sample-managed-client-id";
+ private static final String TOKEN_PROVIDER_IDENTIFIER =
"identity-provider";
+
private TestRunner runner;
private StandardAzureCredentialsControllerService credentialsService;
+ private MockIdentityFederationTokenProvider tokenProvider;
@BeforeEach
public void setUp() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
credentialsService = new StandardAzureCredentialsControllerService();
runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER,
credentialsService);
+
+ tokenProvider = new MockIdentityFederationTokenProvider();
+ runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider);
+ runner.enableControllerService(tokenProvider);
}
@Test
@@ -43,7 +56,6 @@ public class TestStandardAzureCredentialsControllerService {
StandardAzureCredentialsControllerService.DEFAULT_CREDENTIAL);
runner.assertValid(credentialsService);
- // should still be valid be ignored until
CREDENTIAL_CONFIGURATION_STRATEGY is set to MANAGED_IDENTITY
runner.setProperty(credentialsService,
StandardAzureCredentialsControllerService.MANAGED_IDENTITY_CLIENT_ID,
SAMPLE_MANAGED_CLIENT_ID);
@@ -76,5 +88,31 @@ public class TestStandardAzureCredentialsControllerService {
runner.assertValid(credentialsService);
}
+ @Test
+ public void testIdentityFederationStrategyRequiresProvider() {
+ runner.setProperty(credentialsService,
+
StandardAzureCredentialsControllerService.CREDENTIAL_CONFIGURATION_STRATEGY,
+ StandardAzureCredentialsControllerService.IDENTITY_FEDERATION);
+
+ runner.assertNotValid(credentialsService);
+ }
+
+ @Test
+ public void testIdentityFederationStrategyProvidesTokenCredential() throws
Exception {
+ runner.setProperty(credentialsService,
+
StandardAzureCredentialsControllerService.CREDENTIAL_CONFIGURATION_STRATEGY,
+ StandardAzureCredentialsControllerService.IDENTITY_FEDERATION);
+ runner.setProperty(credentialsService,
+
StandardAzureCredentialsControllerService.IDENTITY_FEDERATION_TOKEN_PROVIDER,
+ TOKEN_PROVIDER_IDENTIFIER);
+
+ runner.assertValid(credentialsService);
+ runner.enableControllerService(credentialsService);
+
+ final TokenCredential tokenCredential =
credentialsService.getCredentials();
+ final AccessToken accessToken = tokenCredential.getToken(new
TokenRequestContext().addScopes("https://storage.azure.com/.default")).block();
+ assertNotNull(accessToken);
+ assertEquals(MockIdentityFederationTokenProvider.ACCESS_TOKEN_VALUE,
accessToken.getToken());
+ }
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java
new file mode 100644
index 0000000000..c6bff03375
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link StandardAzureIdentityFederationTokenProvider}.
+ *
+ * Note: These tests validate configuration and property handling only.
+ * Actual token exchange with Azure AD requires integration testing with real
credentials.
+ */
+public class TestStandardAzureIdentityFederationTokenProvider {
+
+ private TestRunner runner;
+ private StandardAzureIdentityFederationTokenProvider tokenProvider;
+
+ @BeforeEach
+ public void setUp() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+ tokenProvider = new StandardAzureIdentityFederationTokenProvider();
+ runner.addControllerService("identity-provider", tokenProvider);
+
+ final MockOAuth2AccessTokenProvider assertionProvider = new
MockOAuth2AccessTokenProvider();
+ runner.addControllerService("assertion-provider", assertionProvider);
+ runner.enableControllerService(assertionProvider);
+ }
+
+ @Test
+ public void testValidConfiguration() {
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.TENANT_ID, "tenant-id");
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.CLIENT_ID, "client-id");
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.CLIENT_ASSERTION_PROVIDER,
"assertion-provider");
+
+ runner.assertValid(tokenProvider);
+ }
+
+ @Test
+ public void testInvalidWithoutTenantId() {
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.CLIENT_ID, "client-id");
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.CLIENT_ASSERTION_PROVIDER,
"assertion-provider");
+
+ runner.assertNotValid(tokenProvider);
+ }
+
+ @Test
+ public void testInvalidWithoutClientId() {
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.TENANT_ID, "tenant-id");
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.CLIENT_ASSERTION_PROVIDER,
"assertion-provider");
+
+ runner.assertNotValid(tokenProvider);
+ }
+
+ @Test
+ public void testInvalidWithoutClientAssertionProvider() {
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.TENANT_ID, "tenant-id");
+ runner.setProperty(tokenProvider,
StandardAzureIdentityFederationTokenProvider.CLIENT_ID, "client-id");
+
+ runner.assertNotValid(tokenProvider);
+ }
+
+ private static class MockOAuth2AccessTokenProvider extends
AbstractControllerService implements OAuth2AccessTokenProvider {
+ @Override
+ public AccessToken getAccessDetails() {
+ final AccessToken accessToken = new AccessToken();
+ accessToken.setAccessToken("client-assertion-token");
+ accessToken.setExpiresIn(600L);
+ return accessToken;
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java
index ad0c1ba83d..a32db164b0 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java
@@ -19,6 +19,8 @@ package org.apache.nifi.services.azure.storage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
+import org.apache.nifi.services.azure.MockIdentityFederationTokenProvider;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -36,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestADLSCredentialsControllerService {
public static final String CREDENTIALS_SERVICE_IDENTIFIER =
"credentials-service";
+ private static final String TOKEN_PROVIDER_IDENTIFIER = "oauth2-provider";
private static final String ACCOUNT_NAME_VALUE = "AccountName";
private static final String ACCOUNT_KEY_VALUE = "AccountKey";
@@ -48,12 +51,16 @@ public class TestADLSCredentialsControllerService {
private TestRunner runner;
private ADLSCredentialsControllerService credentialsService;
+ private MockIdentityFederationTokenProvider tokenProvider;
@BeforeEach
public void setUp() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
credentialsService = new ADLSCredentialsControllerService();
runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER,
credentialsService);
+ tokenProvider = new MockIdentityFederationTokenProvider();
+ runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider);
+ runner.enableControllerService(tokenProvider);
}
@Test
@@ -153,6 +160,23 @@ public class TestADLSCredentialsControllerService {
runner.assertNotValid(credentialsService);
}
+ @Test
+ public void testValidWithAccountNameAndIdentityFederation() {
+
configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION);
+ configureAccountName();
+ configureIdentityFederationProvider();
+
+ runner.assertValid(credentialsService);
+ }
+
+ @Test
+ public void testNotValidWithIdentityFederationMissingProvider() {
+
configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION);
+ configureAccountName();
+
+ runner.assertNotValid(credentialsService);
+ }
+
@Test
public void testGetCredentialsDetailsWithAccountKey() throws Exception {
// GIVEN
@@ -245,6 +269,21 @@ public class TestADLSCredentialsControllerService {
assertNull(actual.getServicePrincipalClientSecret());
}
+ @Test
+ public void testGetCredentialsDetailsWithIdentityFederation() throws
Exception {
+
configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION);
+ configureAccountName();
+ configureIdentityFederationProvider();
+
+ runner.enableControllerService(credentialsService);
+
+ final ADLSCredentialsDetails actual =
credentialsService.getCredentialsDetails(new HashMap<>());
+
+ assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
+ final AzureIdentityFederationTokenProvider identityTokenProvider =
actual.getIdentityTokenProvider();
+ assertNotNull(identityTokenProvider);
+ }
+
@Test
public void testGetCredentialsDetailsWithSystemAssignedManagedIdentity()
throws Exception {
// GIVEN
@@ -405,8 +444,13 @@ public class TestADLSCredentialsControllerService {
runner.setProperty(credentialsService,
AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_SECRET,
SERVICE_PRINCIPAL_CLIENT_SECRET_VALUE);
}
+ private void configureIdentityFederationProvider() {
+ runner.setProperty(credentialsService,
AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER,
TOKEN_PROVIDER_IDENTIFIER);
+ }
+
private void configurePropertyUsingEL(PropertyDescriptor
propertyDescriptor, String variableName, String variableValue) {
runner.setProperty(credentialsService, propertyDescriptor,
String.format("${%s}", variableName));
runner.setEnvironmentVariableValue(variableName, variableValue);
}
+
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
index 94b61bb2e2..5136fdea1e 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
@@ -16,7 +16,10 @@
*/
package org.apache.nifi.services.azure.storage;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
+import org.apache.nifi.services.azure.MockIdentityFederationTokenProvider;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -35,11 +38,13 @@ import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.S
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_SECRET;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_TENANT_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class TestAzureStorageCredentialsControllerService_v12 {
public static final String CREDENTIALS_SERVICE_IDENTIFIER =
"credentials-service";
+ private static final String TOKEN_PROVIDER_IDENTIFIER = "oauth2-provider";
private static final String ACCOUNT_NAME_VALUE = "AccountName";
private static final String ACCOUNT_KEY_VALUE = "AccountKey";
@@ -51,12 +56,17 @@ public class
TestAzureStorageCredentialsControllerService_v12 {
private TestRunner runner;
private AzureStorageCredentialsControllerService_v12 credentialsService;
+ private MockIdentityFederationTokenProvider tokenProvider;
@BeforeEach
public void setUp() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
credentialsService = new
AzureStorageCredentialsControllerService_v12();
runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER,
credentialsService);
+
+ tokenProvider = new MockIdentityFederationTokenProvider();
+ runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider);
+ runner.enableControllerService(tokenProvider);
}
@Test
@@ -150,6 +160,39 @@ public class
TestAzureStorageCredentialsControllerService_v12 {
runner.assertNotValid(credentialsService);
}
+ @Test
+ public void testIdentityFederationCredentialsTypeValid() {
+ configureAccountName();
+
configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION);
+ configureIdentityFederationProvider();
+
+ runner.assertValid(credentialsService);
+ }
+
+ @Test
+ public void
testIdentityFederationCredentialsTypeNotValidWhenProviderMissing() {
+ configureAccountName();
+
configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION);
+
+ runner.assertNotValid(credentialsService);
+ }
+
+ @Test
+ public void testGetCredentialsDetailsWithIdentityFederation() throws
Exception {
+ configureAccountName();
+
configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION);
+ configureIdentityFederationProvider();
+
+ runner.enableControllerService(credentialsService);
+
+ final AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails(Collections.emptyMap());
+
+ assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
+ assertEquals(AzureStorageCredentialsType.IDENTITY_FEDERATION,
actual.getCredentialsType());
+ final AzureIdentityFederationTokenProvider identityTokenProvider =
actual.getIdentityTokenProvider();
+ assertNotNull(identityTokenProvider);
+ }
+
@Test
public void testGetCredentialsDetailsWithAccountKey() {
configureAccountName();
@@ -276,4 +319,9 @@ public class
TestAzureStorageCredentialsControllerService_v12 {
private void configureServicePrincipalClientSecret() {
runner.setProperty(credentialsService,
SERVICE_PRINCIPAL_CLIENT_SECRET, SERVICE_PRINCIPAL_CLIENT_SECRET_VALUE);
}
+
+ private void configureIdentityFederationProvider() {
+ runner.setProperty(credentialsService,
AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER,
TOKEN_PROVIDER_IDENTIFIER);
+ }
+
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/util/TestAzureWorkloadIdentityCredentialUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/util/TestAzureWorkloadIdentityCredentialUtils.java
new file mode 100644
index 0000000000..f9e3968381
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/util/TestAzureWorkloadIdentityCredentialUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.util;
+
+import com.azure.core.credential.TokenCredential;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Unit tests for {@link AzureWorkloadIdentityCredentialUtils}.
+ */
+public class TestAzureWorkloadIdentityCredentialUtils {
+
+ private static final String TENANT_ID = "test-tenant-id";
+ private static final String CLIENT_ID = "test-client-id";
+ private static final String ASSERTION_TOKEN = "test-assertion-token";
+
+ @Test
+ public void testCreateCredentialWithOAuth2Provider() {
+ final OAuth2AccessTokenProvider provider = new
MockOAuth2Provider(ASSERTION_TOKEN);
+
+ final TokenCredential credential =
AzureWorkloadIdentityCredentialUtils.createCredential(
+ TENANT_ID, CLIENT_ID, provider);
+
+ assertNotNull(credential);
+ }
+
+ @Test
+ public void testCreateCredentialWithSupplier() {
+ final TokenCredential credential =
AzureWorkloadIdentityCredentialUtils.createCredential(
+ TENANT_ID, CLIENT_ID, () -> ASSERTION_TOKEN);
+
+ assertNotNull(credential);
+ }
+
+ @Test
+ public void testCreateCredentialWithNullTenantId() {
+ assertThrows(NullPointerException.class, () ->
+ AzureWorkloadIdentityCredentialUtils.createCredential(null,
CLIENT_ID, () -> ASSERTION_TOKEN));
+ }
+
+ @Test
+ public void testCreateCredentialWithNullClientId() {
+ assertThrows(NullPointerException.class, () ->
+
AzureWorkloadIdentityCredentialUtils.createCredential(TENANT_ID, null, () ->
ASSERTION_TOKEN));
+ }
+
+ @Test
+ public void testCreateCredentialWithNullSupplier() {
+ assertThrows(NullPointerException.class, () ->
+
AzureWorkloadIdentityCredentialUtils.createCredential(TENANT_ID, CLIENT_ID,
(java.util.function.Supplier<String>) null));
+ }
+
+ @Test
+ public void testCreateCredentialWithNullProvider() {
+ assertThrows(NullPointerException.class, () ->
+
AzureWorkloadIdentityCredentialUtils.createCredential(TENANT_ID, CLIENT_ID,
(OAuth2AccessTokenProvider) null));
+ }
+
+ @Test
+ public void testProviderReturnsNullAccessToken() {
+ final OAuth2AccessTokenProvider provider = new
MockOAuth2Provider(null, true);
+
+ final TokenCredential credential =
AzureWorkloadIdentityCredentialUtils.createCredential(
+ TENANT_ID, CLIENT_ID, provider);
+
+ // Credential is created, but calling getToken() would fail when the
supplier is invoked
+ assertNotNull(credential);
+ }
+
+ @Test
+ public void testProviderReturnsEmptyToken() {
+ final OAuth2AccessTokenProvider provider = new MockOAuth2Provider("");
+
+ final TokenCredential credential =
AzureWorkloadIdentityCredentialUtils.createCredential(
+ TENANT_ID, CLIENT_ID, provider);
+
+ // Credential is created, but calling getToken() would fail when the
supplier is invoked
+ assertNotNull(credential);
+ }
+
+ private static class MockOAuth2Provider extends AbstractControllerService
implements OAuth2AccessTokenProvider {
+ private final String tokenValue;
+ private final boolean returnNull;
+
+ MockOAuth2Provider(String tokenValue) {
+ this(tokenValue, false);
+ }
+
+ MockOAuth2Provider(String tokenValue, boolean returnNull) {
+ this.tokenValue = tokenValue;
+ this.returnNull = returnNull;
+ }
+
+ @Override
+ public AccessToken getAccessDetails() {
+ if (returnNull) {
+ return null;
+ }
+ final AccessToken accessToken = new AccessToken();
+ accessToken.setAccessToken(tokenValue);
+ accessToken.setExpiresIn(600L);
+ return accessToken;
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java
new file mode 100644
index 0000000000..6b278171f3
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure;
+
+import com.azure.core.credential.TokenCredential;
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * Controller Service that provides Azure {@link TokenCredential} for workload
identity federation.
+ * Implementations exchange an external identity token for an Azure AD access
token suitable for
+ * Azure service clients (for example, Storage, Data Lake, or Event Hubs).
+ */
+public interface AzureIdentityFederationTokenProvider extends
ControllerService {
+
+ /**
+ * Returns a {@link TokenCredential} that can be used to authenticate with
Azure services.
+ * The credential handles token acquisition and refresh automatically.
+ *
+ * @return a TokenCredential for Azure service authentication
+ */
+ TokenCredential getCredentials();
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
index b1780f9e16..e6ec397dd3 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
@@ -18,6 +18,7 @@ package org.apache.nifi.services.azure.storage;
import com.azure.core.credential.AccessToken;
import com.azure.core.http.ProxyOptions;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import java.util.Objects;
@@ -40,6 +41,7 @@ public class ADLSCredentialsDetails {
private final String servicePrincipalClientId;
private final String servicePrincipalClientSecret;
+ private final AzureIdentityFederationTokenProvider identityTokenProvider;
private final ProxyOptions proxyOptions;
public ADLSCredentialsDetails(
@@ -53,6 +55,7 @@ public class ADLSCredentialsDetails {
String servicePrincipalTenantId,
String servicePrincipalClientId,
String servicePrincipalClientSecret,
+ AzureIdentityFederationTokenProvider identityTokenProvider,
ProxyOptions proxyOptions
) {
this.accountName = accountName;
@@ -65,6 +68,7 @@ public class ADLSCredentialsDetails {
this.servicePrincipalTenantId = servicePrincipalTenantId;
this.servicePrincipalClientId = servicePrincipalClientId;
this.servicePrincipalClientSecret = servicePrincipalClientSecret;
+ this.identityTokenProvider = identityTokenProvider;
this.proxyOptions = proxyOptions;
}
@@ -108,6 +112,10 @@ public class ADLSCredentialsDetails {
return servicePrincipalClientSecret;
}
+ public AzureIdentityFederationTokenProvider getIdentityTokenProvider() {
+ return identityTokenProvider;
+ }
+
public ProxyOptions getProxyOptions() {
return proxyOptions;
}
@@ -133,6 +141,7 @@ public class ADLSCredentialsDetails {
&& Objects.equals(servicePrincipalTenantId,
that.servicePrincipalTenantId)
&& Objects.equals(servicePrincipalClientId,
that.servicePrincipalClientId)
&& Objects.equals(servicePrincipalClientSecret,
that.servicePrincipalClientSecret)
+ && Objects.equals(identityTokenProvider,
that.identityTokenProvider)
&& equalsProxyOptions(proxyOptions, that.proxyOptions);
}
@@ -149,6 +158,7 @@ public class ADLSCredentialsDetails {
servicePrincipalTenantId,
servicePrincipalClientId,
servicePrincipalClientSecret,
+ identityTokenProvider,
hashCodeProxyOptions(proxyOptions)
);
}
@@ -164,6 +174,7 @@ public class ADLSCredentialsDetails {
private String servicePrincipalTenantId;
private String servicePrincipalClientId;
private String servicePrincipalClientSecret;
+ private AzureIdentityFederationTokenProvider identityTokenProvider;
private ProxyOptions proxyOptions;
private Builder() { }
@@ -222,6 +233,11 @@ public class ADLSCredentialsDetails {
return this;
}
+ public Builder setIdentityTokenProvider(final
AzureIdentityFederationTokenProvider identityTokenProvider) {
+ this.identityTokenProvider = identityTokenProvider;
+ return this;
+ }
+
public Builder setProxyOptions(ProxyOptions proxyOptions) {
this.proxyOptions = proxyOptions;
return this;
@@ -229,7 +245,7 @@ public class ADLSCredentialsDetails {
public ADLSCredentialsDetails build() {
return new ADLSCredentialsDetails(accountName, accountKey,
sasToken, endpointSuffix, accessToken, useManagedIdentity,
managedIdentityClientId,
- servicePrincipalTenantId, servicePrincipalClientId,
servicePrincipalClientSecret, proxyOptions);
+ servicePrincipalTenantId, servicePrincipalClientId,
servicePrincipalClientSecret, identityTokenProvider, proxyOptions);
}
}
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
index ee28670f9c..5b240b749c 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
@@ -18,6 +18,7 @@ package org.apache.nifi.services.azure.storage;
import com.azure.core.credential.AccessToken;
import com.azure.core.http.ProxyOptions;
+import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider;
import java.util.Objects;
@@ -36,11 +37,13 @@ public class AzureStorageCredentialsDetails_v12 {
private final String servicePrincipalClientId;
private final String servicePrincipalClientSecret;
private final AccessToken accessToken;
+ private final AzureIdentityFederationTokenProvider identityTokenProvider;
private final ProxyOptions proxyOptions;
private AzureStorageCredentialsDetails_v12(
String accountName, String endpointSuffix,
AzureStorageCredentialsType credentialsType, String accountKey, String
sasToken, String managedIdentityClientId,
- String servicePrincipalTenantId, String servicePrincipalClientId,
String servicePrincipalClientSecret, AccessToken accessToken, ProxyOptions
proxyOptions) {
+ String servicePrincipalTenantId, String servicePrincipalClientId,
String servicePrincipalClientSecret, AccessToken accessToken,
+ AzureIdentityFederationTokenProvider identityTokenProvider,
ProxyOptions proxyOptions) {
this.accountName = accountName;
this.endpointSuffix = endpointSuffix;
this.credentialsType = credentialsType;
@@ -51,6 +54,7 @@ public class AzureStorageCredentialsDetails_v12 {
this.servicePrincipalClientId = servicePrincipalClientId;
this.servicePrincipalClientSecret = servicePrincipalClientSecret;
this.accessToken = accessToken;
+ this.identityTokenProvider = identityTokenProvider;
this.proxyOptions = proxyOptions;
}
@@ -94,6 +98,10 @@ public class AzureStorageCredentialsDetails_v12 {
return accessToken;
}
+ public AzureIdentityFederationTokenProvider getIdentityTokenProvider() {
+ return identityTokenProvider;
+ }
+
public ProxyOptions getProxyOptions() {
return proxyOptions;
}
@@ -119,6 +127,7 @@ public class AzureStorageCredentialsDetails_v12 {
&& Objects.equals(servicePrincipalClientId,
that.servicePrincipalClientId)
&& Objects.equals(servicePrincipalClientSecret,
that.servicePrincipalClientSecret)
&& Objects.equals(accessToken, that.accessToken)
+ && Objects.equals(identityTokenProvider,
that.identityTokenProvider)
&& equalsProxyOptions(proxyOptions, that.proxyOptions);
}
@@ -135,6 +144,7 @@ public class AzureStorageCredentialsDetails_v12 {
servicePrincipalClientId,
servicePrincipalClientSecret,
accessToken,
+ identityTokenProvider,
hashCodeProxyOptions(proxyOptions)
);
}
@@ -143,14 +153,16 @@ public class AzureStorageCredentialsDetails_v12 {
String accountName,
String endpointSuffix,
String accountKey) {
- return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.ACCOUNT_KEY, accountKey, null,
null, null, null, null, null, null);
+ return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.ACCOUNT_KEY,
+ accountKey, null, null, null, null, null, null, null, null);
}
public static AzureStorageCredentialsDetails_v12 createWithSasToken(
String accountName,
String endpointSuffix,
String sasToken) {
- return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.SAS_TOKEN, null, sasToken, null,
null, null, null, null, null);
+ return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.SAS_TOKEN,
+ null, sasToken, null, null, null, null, null, null, null);
}
public static AzureStorageCredentialsDetails_v12 createWithManagedIdentity(
@@ -158,8 +170,8 @@ public class AzureStorageCredentialsDetails_v12 {
String endpointSuffix,
String managedIdentityClientId,
ProxyOptions proxyOptions) {
- return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.MANAGED_IDENTITY, null, null,
managedIdentityClientId,
- null, null, null, null, proxyOptions);
+ return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.MANAGED_IDENTITY,
+ null, null, managedIdentityClientId, null, null, null, null,
null, proxyOptions);
}
public static AzureStorageCredentialsDetails_v12
createWithServicePrincipal(
@@ -169,14 +181,23 @@ public class AzureStorageCredentialsDetails_v12 {
String servicePrincipalClientId,
String servicePrincipalClientSecret,
ProxyOptions proxyOptions) {
- return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.SERVICE_PRINCIPAL, null, null, null,
- servicePrincipalTenantId, servicePrincipalClientId,
servicePrincipalClientSecret, null, proxyOptions);
+ return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.SERVICE_PRINCIPAL,
+ null, null, null, servicePrincipalTenantId,
servicePrincipalClientId, servicePrincipalClientSecret, null, null,
proxyOptions);
}
public static AzureStorageCredentialsDetails_v12 createWithAccessToken(
String accountName,
String endpointSuffix,
AccessToken accessToken) {
- return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.ACCESS_TOKEN, null, null, null,
null, null, null, accessToken, null);
+ return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.ACCESS_TOKEN,
+ null, null, null, null, null, null, accessToken, null, null);
+ }
+
+ public static AzureStorageCredentialsDetails_v12
createWithIdentityTokenProvider(
+ String accountName,
+ String endpointSuffix,
+ AzureIdentityFederationTokenProvider identityTokenProvider) {
+ return new AzureStorageCredentialsDetails_v12(accountName,
endpointSuffix, AzureStorageCredentialsType.IDENTITY_FEDERATION, null, null,
null,
+ null, null, null, null, identityTokenProvider, null);
}
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
index 04921c9ec0..b63b4f220d 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
@@ -24,7 +24,8 @@ public enum AzureStorageCredentialsType implements
DescribedValue {
SAS_TOKEN("SAS Token", "SAS (Shared Access Signature) Token generated for
accessing resources in the storage account"),
MANAGED_IDENTITY("Managed Identity", "Azure Virtual Machine Managed
Identity (it can only be used when NiFi is running on Azure)"),
SERVICE_PRINCIPAL("Service Principal", "Azure Active Directory Service
Principal with Client Id / Client Secret of a registered application"),
- ACCESS_TOKEN("Access Token", "Access Token provided by custom controller
service implementations");
+ ACCESS_TOKEN("Access Token", "Access Token provided by custom controller
service implementations"),
+ IDENTITY_FEDERATION("Identity Federation", "Azure credential obtained via
workload identity federation using an external identity token");
private final String displayName;
private final String description;