This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new bdcbec9d6c NIFI-15185 Added OAuth support for Azure Event Hubs
Processors (#10498)
bdcbec9d6c is described below
commit bdcbec9d6c6c049204f384c2677700aebb17892c
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Nov 25 17:58:51 2025 +0100
NIFI-15185 Added OAuth support for Azure Event Hubs Processors (#10498)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-azure-processors/pom.xml | 4 +
.../azure/eventhub/ConsumeAzureEventHub.java | 172 +++++++++++++++------
.../azure/eventhub/GetAzureEventHub.java | 36 +++--
.../azure/eventhub/PutAzureEventHub.java | 35 +++--
.../azure/eventhub/utils/AzureEventHubUtils.java | 121 ++++++++++++---
.../AzureEventHubAuthenticationStrategy.java | 3 +-
.../azure/eventhubs/AzureEventHubComponent.java | 9 ++
.../BlobStorageAuthenticationStrategy.java | 3 +-
.../azure/eventhub/GetAzureEventHubTest.java | 36 ++++-
.../azure/eventhub/PutAzureEventHubTest.java | 36 ++++-
.../azure/eventhub/TestConsumeAzureEventHub.java | 57 ++++++-
11 files changed, 425 insertions(+), 87 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index ae86d16cdb..ad003cb5ec 100644
--- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -52,6 +52,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 2d9811198d..d9e07843a7 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -21,6 +21,7 @@ import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.core.credential.TokenCredential;
import com.azure.core.http.ProxyOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.identity.ManagedIdentityCredential;
@@ -58,6 +59,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -147,6 +149,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
+ static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER =
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY_NAME = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Listen claims.")
@@ -257,6 +260,13 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.required(true)
.dependsOn(CHECKPOINT_STRATEGY,
CheckpointStrategy.AZURE_BLOB_STORAGE)
.build();
+ static final PropertyDescriptor BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER
= new PropertyDescriptor.Builder()
+ .name("Storage Access Token Provider")
+ .description("Controller Service providing OAuth2 Access Tokens
for authenticating to Azure Blob Storage when persisting checkpoints.")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .required(true)
+ .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.OAUTH2)
+ .build();
static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new
PropertyDescriptor.Builder()
.name("Storage Account Key")
.description("The Azure Storage account key to store event hub
consumer group state.")
@@ -315,6 +325,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
+ EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
CONSUMER_GROUP,
RECORD_READER,
RECORD_WRITER,
@@ -328,6 +339,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
BLOB_STORAGE_AUTHENTICATION_STRATEGY,
STORAGE_ACCOUNT_KEY,
STORAGE_SAS_TOKEN,
+ BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER,
PROXY_CONFIGURATION_SERVICE
);
@@ -422,12 +434,13 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
final String storageAccountKey =
validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken =
validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
final CheckpointStrategy checkpointStrategy =
CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue());
+ final boolean blobOauthProviderSet =
validationContext.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).isSet();
if ((recordReader != null && recordWriter == null) || (recordReader ==
null && recordWriter != null)) {
results.add(new ValidationResult.Builder()
.subject("Record Reader and Writer")
- .explanation(String.format("Both %s and %s should be set
in order to write FlowFiles as Records.",
- RECORD_READER.getDisplayName(),
RECORD_WRITER.getDisplayName()))
+ .explanation("Both %s and %s should be set in order to
write FlowFiles as Records."
+ .formatted(RECORD_READER.getDisplayName(),
RECORD_WRITER.getDisplayName()))
.valid(false)
.build());
}
@@ -441,10 +454,10 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
if (StringUtils.isBlank(storageAccountKey)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_ACCOUNT_KEY.getDisplayName())
- .explanation(String.format("%s must be set when %s
is %s.",
- STORAGE_ACCOUNT_KEY.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
+ .explanation("%s must be set when %s is %s."
+
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
.valid(false)
.build());
}
@@ -452,10 +465,10 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
if (StringUtils.isNotBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_SAS_TOKEN.getDisplayName())
- .explanation(String.format("%s must not be set
when %s is %s.",
- STORAGE_SAS_TOKEN.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
+ .explanation("%s must not be set when %s is %s."
+
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
.valid(false)
.build());
}
@@ -463,10 +476,32 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
if (StringUtils.isBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_SAS_TOKEN.getDisplayName())
- .explanation(String.format("%s must be set when %s
is %s.",
- STORAGE_SAS_TOKEN.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+ .explanation("%s must be set when %s is %s."
+
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+ .valid(false)
+ .build());
+ }
+
+ if (StringUtils.isNotBlank(storageAccountKey)) {
+ results.add(new ValidationResult.Builder()
+ .subject(STORAGE_ACCOUNT_KEY.getDisplayName())
+ .explanation("%s must not be set when %s is %s."
+
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+ .valid(false)
+ .build());
+ }
+ } else if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.OAUTH2) {
+ if (!blobOauthProviderSet) {
+ results.add(new ValidationResult.Builder()
+
.subject(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName())
+ .explanation("%s must be set when %s is %s."
+
.formatted(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
.valid(false)
.build());
}
@@ -474,16 +509,27 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
if (StringUtils.isNotBlank(storageAccountKey)) {
results.add(new ValidationResult.Builder()
.subject(STORAGE_ACCOUNT_KEY.getDisplayName())
- .explanation(String.format("%s must not be set
when %s is %s.",
- STORAGE_ACCOUNT_KEY.getDisplayName(),
-
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+ .explanation("%s must not be set when %s is %s."
+
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
+ .valid(false)
+ .build());
+ }
+
+ if (StringUtils.isNotBlank(storageSasToken)) {
+ results.add(new ValidationResult.Builder()
+ .subject(STORAGE_SAS_TOKEN.getDisplayName())
+ .explanation("%s must not be set when %s is %s."
+
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
.valid(false)
.build());
}
}
}
- results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY, validationContext));
+ results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, validationContext));
return results;
}
@@ -561,10 +607,29 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
final String containerName =
defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(),
eventHubName);
- final String storageConnectionString =
createStorageConnectionString(context);
- final BlobContainerClientBuilder blobContainerClientBuilder = new
BlobContainerClientBuilder()
- .connectionString(storageConnectionString)
- .containerName(containerName);
+ final String storageAccountName =
context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+ final String domainName = getStorageDomainName(serviceBusEndpoint);
+ final BlobStorageAuthenticationStrategy
blobStorageAuthenticationStrategy =
+
context.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);
+
+ final BlobContainerClientBuilder blobContainerClientBuilder = new
BlobContainerClientBuilder();
+
+ switch (blobStorageAuthenticationStrategy) {
+ case STORAGE_ACCOUNT_KEY, SHARED_ACCESS_SIGNATURE -> {
+ final String storageConnectionString =
createStorageConnectionString(context, blobStorageAuthenticationStrategy,
storageAccountName, domainName);
+
blobContainerClientBuilder.connectionString(storageConnectionString);
+ }
+ case OAUTH2 -> {
+ final OAuth2AccessTokenProvider tokenProvider =
+
context.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
+ final String endpoint =
createBlobEndpoint(storageAccountName, domainName);
+ blobContainerClientBuilder.endpoint(endpoint);
+ blobContainerClientBuilder.credential(tokenCredential);
+ }
+ }
+ blobContainerClientBuilder.containerName(containerName);
+
final ProxyOptions storageProxyOptions =
AzureStorageUtils.getProxyOptions(context);
if (storageProxyOptions != null) {
blobContainerClientBuilder.clientOptions(new
HttpClientOptions().setProxyOptions(storageProxyOptions));
@@ -593,17 +658,30 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.processError(errorProcessor)
.processEventBatch(eventBatchProcessor, maxBatchSize,
maxWaitTime);
- final AzureEventHubAuthenticationStrategy authenticationStrategy =
+ final AzureEventHubAuthenticationStrategy
configuredAuthenticationStrategy =
context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class);
- if (authenticationStrategy == null || authenticationStrategy ==
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
- final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
- final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
- eventProcessorClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
- } else {
- final String policyName =
context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
- final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
- final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
- eventProcessorClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
+ final AzureEventHubAuthenticationStrategy authenticationStrategy =
configuredAuthenticationStrategy == null
+ ? AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY
+ : configuredAuthenticationStrategy;
+
+ switch (authenticationStrategy) {
+ case MANAGED_IDENTITY -> {
+ final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+ final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
+
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName,
managedIdentityCredential);
+ }
+ case SHARED_ACCESS_SIGNATURE -> {
+ final String policyName =
context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
+ final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
+
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName,
azureNamedKeyCredential);
+ }
+ case OAUTH2 -> {
+ final OAuth2AccessTokenProvider tokenProvider =
+
context.getProperty(EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
+
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName,
tokenCredential);
+ }
}
final Integer prefetchCount =
context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
@@ -832,20 +910,28 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
session.getProvenanceReporter().receive(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
- private String createStorageConnectionString(final ProcessContext context)
{
- final String storageAccountName =
context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
- final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
- final String domainName = serviceBusEndpoint.replace(".servicebus.",
"");
+ private String createStorageConnectionString(final ProcessContext context,
+ final
BlobStorageAuthenticationStrategy blobStorageAuthenticationStrategy,
+ final String
storageAccountName,
+ final String domainName) {
final String storageAccountKey =
context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken =
context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
- final BlobStorageAuthenticationStrategy
blobStorageAuthenticationStrategy =
-
context.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);
+ return switch (blobStorageAuthenticationStrategy) {
+ case STORAGE_ACCOUNT_KEY ->
+
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY,
storageAccountName, storageAccountKey, domainName);
+ case SHARED_ACCESS_SIGNATURE ->
+
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN,
storageAccountName, domainName, storageSasToken);
+ case OAUTH2 -> throw new IllegalArgumentException(String.format(
+ "Blob Storage Authentication Strategy %s does not support
connection string authentication", blobStorageAuthenticationStrategy));
+ };
+ }
- if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) {
- return
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY,
storageAccountName, storageAccountKey, domainName);
- }
+ private String createBlobEndpoint(final String storageAccountName, final
String domainName) {
+ return String.format("https://%s.blob.core.%s/", storageAccountName,
domainName);
+ }
- return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN,
storageAccountName, domainName, storageSasToken);
+ private String getStorageDomainName(final String serviceBusEndpoint) {
+ return serviceBusEndpoint.replace(".servicebus.", "");
}
private Map<String, EventPosition> getLegacyPartitionEventPosition(
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 1e31f383e2..d8dc8b4611 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.eventhub;
import com.azure.core.amqp.AmqpClientOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.core.credential.TokenCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.EventData;
@@ -58,6 +59,7 @@ import org.apache.nifi.scheduling.ExecutionNode;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.util.StopWatch;
import java.time.Duration;
@@ -113,6 +115,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
+ static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER =
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Listen claims.")
@@ -168,6 +171,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
ACCESS_POLICY,
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
+ EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
CONSUMER_GROUP,
ENQUEUE_TIME,
RECEIVER_FETCH_SIZE,
@@ -205,7 +209,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, context);
+ return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
}
@OnPrimaryNodeStateChange
@@ -410,15 +414,27 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
final String consumerGroup =
context.getProperty(CONSUMER_GROUP).getValue();
eventHubClientBuilder.consumerGroup(consumerGroup);
- if (authenticationStrategy == null || authenticationStrategy ==
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
- final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
- final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
- eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
- } else {
- final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
- final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
- final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
- eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
+ final AzureEventHubAuthenticationStrategy
resolvedAuthenticationStrategy =
+ authenticationStrategy == null ?
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY : authenticationStrategy;
+
+ switch (resolvedAuthenticationStrategy) {
+ case MANAGED_IDENTITY -> {
+ final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+ final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
+ }
+ case SHARED_ACCESS_SIGNATURE -> {
+ final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
+ final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
+ }
+ case OAUTH2 -> {
+ final OAuth2AccessTokenProvider tokenProvider =
+
context.getProperty(EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, tokenCredential);
+ }
}
// Set Azure Event Hub Client Identifier using Processor Identifier
instead of default random UUID
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index 991b19428c..d34b3a872d 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.eventhub;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.core.credential.TokenCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.EventData;
@@ -40,6 +41,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -86,6 +88,7 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
+ static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER =
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
static final PropertyDescriptor ACCESS_POLICY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Send claims.")
@@ -129,6 +132,7 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
ACCESS_POLICY,
POLICY_PRIMARY_KEY,
AUTHENTICATION_STRATEGY,
+ EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
PARTITIONING_KEY_ATTRIBUTE_NAME,
MAX_BATCH_SIZE,
PROXY_CONFIGURATION_SERVICE
@@ -167,7 +171,7 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, context);
+ return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
}
@Override
@@ -235,15 +239,26 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
eventHubClientBuilder.transportType(transportType);
final String fullyQualifiedNamespace = String.format("%s%s",
namespace, serviceBusEndpoint);
- if (authenticationStrategy == null || authenticationStrategy ==
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
- final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
- final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
- eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
- } else {
- final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
- final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
- final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
- eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
+ final AzureEventHubAuthenticationStrategy
resolvedAuthenticationStrategy =
+ authenticationStrategy == null ?
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY : authenticationStrategy;
+ switch (resolvedAuthenticationStrategy) {
+ case MANAGED_IDENTITY -> {
+ final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+ final ManagedIdentityCredential managedIdentityCredential
= managedIdentityCredentialBuilder.build();
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
+ }
+ case SHARED_ACCESS_SIGNATURE -> {
+ final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
+ final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ final AzureNamedKeyCredential azureNamedKeyCredential =
new AzureNamedKeyCredential(policyName, policyKey);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
+ }
+ case OAUTH2 -> {
+ final OAuth2AccessTokenProvider tokenProvider =
+
context.getProperty(EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ final TokenCredential tokenCredential =
AzureEventHubUtils.createTokenCredential(tokenProvider);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, tokenCredential);
+ }
}
AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions);
return eventHubClientBuilder.buildProducerClient();
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
index 0c3f657b18..566d08b056 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -18,25 +18,33 @@ package org.apache.nifi.processors.azure.eventhub.utils;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
+import com.azure.core.credential.TokenCredential;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
+import reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.net.Proxy;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
public final class AzureEventHubUtils {
@@ -68,8 +76,11 @@ public final class AzureEventHubUtils {
.required(true)
.build();
+ private static final long DEFAULT_TOKEN_LIFETIME_SECONDS =
TimeUnit.MINUTES.toSeconds(5);
+
public static List<ValidationResult> customValidate(PropertyDescriptor
accessPolicyDescriptor,
PropertyDescriptor
policyKeyDescriptor,
+ PropertyDescriptor
tokenProviderDescriptor,
ValidationContext
context) {
List<ValidationResult> validationResults = new ArrayList<>();
@@ -79,25 +90,75 @@ public final class AzureEventHubUtils {
context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY)
.asAllowableValue(AzureEventHubAuthenticationStrategy.class))
.orElse(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY);
-
- if (authenticationStrategy ==
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY && (accessPolicyIsSet ||
policyKeyIsSet)) {
- final String msg = String.format(
- "When '%s' is set to '%s', '%s' and '%s' must not be set.",
-
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName(),
- accessPolicyDescriptor.getDisplayName(),
- POLICY_PRIMARY_KEY.getDisplayName()
- );
- validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
- } else if (authenticationStrategy ==
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE &&
(!accessPolicyIsSet || !policyKeyIsSet)) {
- final String msg = String.format(
- "When '%s' is set to '%s', both '%s' and '%s' must be set",
-
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName(),
- accessPolicyDescriptor.getDisplayName(),
- POLICY_PRIMARY_KEY.getDisplayName()
- );
- validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
+ final boolean tokenProviderIsSet = tokenProviderDescriptor != null &&
context.getProperty(tokenProviderDescriptor).isSet();
+
+ switch (authenticationStrategy) {
+ case MANAGED_IDENTITY -> {
+ if (accessPolicyIsSet || policyKeyIsSet) {
+ final String msg = String.format(
+ "When '%s' is set to '%s', '%s' and '%s' must not
be set.",
+
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName(),
+ accessPolicyDescriptor.getDisplayName(),
+ policyKeyDescriptor.getDisplayName()
+ );
+ validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
+ }
+ if (tokenProviderIsSet) {
+ validationResults.add(new ValidationResult.Builder()
+
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
+ .valid(false)
+ .explanation(String.format("'%s' must not be set
when '%s' is '%s'.",
+ tokenProviderDescriptor.getDisplayName(),
+
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName()))
+ .build());
+ }
+ }
+ case SHARED_ACCESS_SIGNATURE -> {
+ if (!accessPolicyIsSet || !policyKeyIsSet) {
+ final String msg = String.format(
+ "When '%s' is set to '%s', both '%s' and '%s' must
be set",
+
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName(),
+ accessPolicyDescriptor.getDisplayName(),
+ policyKeyDescriptor.getDisplayName()
+ );
+ validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
+ }
+ if (tokenProviderIsSet) {
+ validationResults.add(new ValidationResult.Builder()
+
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
+ .valid(false)
+ .explanation(String.format("'%s' must not be set
when '%s' is '%s'.",
+ tokenProviderDescriptor.getDisplayName(),
+
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+ .build());
+ }
+ }
+ case OAUTH2 -> {
+ if (accessPolicyIsSet || policyKeyIsSet) {
+ final String msg = String.format(
+ "When '%s' is set to '%s', '%s' and '%s' must not
be set.",
+
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+
AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName(),
+ accessPolicyDescriptor.getDisplayName(),
+ policyKeyDescriptor.getDisplayName()
+ );
+ validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
+ }
+ if (!tokenProviderIsSet) {
+ validationResults.add(new ValidationResult.Builder()
+
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
+ .valid(false)
+ .explanation(String.format("'%s' must be set when
'%s' is '%s'.",
+ tokenProviderDescriptor.getDisplayName(),
+
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+
AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName()))
+ .build());
+ }
+ }
}
ProxyConfiguration.validateProxySpec(context, validationResults,
AzureEventHubComponent.PROXY_SPECS);
return validationResults;
@@ -150,6 +211,28 @@ public final class AzureEventHubUtils {
return Optional.ofNullable(proxyOptions);
}
+ public static TokenCredential createTokenCredential(final
OAuth2AccessTokenProvider tokenProvider) {
+ Objects.requireNonNull(tokenProvider, "OAuth2 Access Token Provider is
required");
+
+ return tokenRequestContext -> Mono.fromSupplier(() -> {
+ final org.apache.nifi.oauth2.AccessToken accessDetails =
tokenProvider.getAccessDetails();
+ final String accessToken = accessDetails.getAccessToken();
+
+ if (accessToken == null || accessToken.isBlank()) {
+ throw new IllegalStateException("OAuth2 Access Token Provider
returned an empty access token");
+ }
+
+ final Instant fetchTime = accessDetails.getFetchTime();
+ final long expiresInSeconds = accessDetails.getExpiresIn();
+ final Instant expirationInstant = expiresInSeconds > 0
+ ? fetchTime.plusSeconds(expiresInSeconds)
+ : fetchTime.plusSeconds(DEFAULT_TOKEN_LIFETIME_SECONDS);
+ final OffsetDateTime expiresAt =
OffsetDateTime.ofInstant(expirationInstant, ZoneOffset.UTC);
+
+ return new com.azure.core.credential.AccessToken(accessToken,
expiresAt);
+ });
+ }
+
private static Proxy getProxy(ProxyConfiguration proxyConfiguration) {
final Proxy.Type type;
if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
index 654871914c..88398e82ec 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
@@ -23,7 +23,8 @@ import org.apache.nifi.components.DescribedValue;
*/
public enum AzureEventHubAuthenticationStrategy implements DescribedValue {
MANAGED_IDENTITY("Managed Identity", "Authenticate using the Managed
Identity of the hosting Azure resource."),
- SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate using the
Shared Access Policy name and key.");
+ SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate using the
Shared Access Policy name and key."),
+ OAUTH2("OAuth2", "Authenticate using an OAuth2 Access Token Provider
backed by an Entra registered application.");
private final String displayName;
private final String description;
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
index 4da423cb3a..caf503f024 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
@@ -18,6 +18,7 @@ package org.apache.nifi.shared.azure.eventhubs;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
@@ -43,6 +44,14 @@ public interface AzureEventHubComponent {
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
+ PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("Event Hubs Access Token Provider")
+ .description("Controller Service providing OAuth2 Access Tokens
for authenticating to Azure Event Hubs")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.OAUTH2)
+ .build();
ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.HTTP_AUTH};
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new
PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(PROXY_SPECS))
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
index 32c945bc96..3ab03cc3f6 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
@@ -20,7 +20,8 @@ import org.apache.nifi.components.DescribedValue;
public enum BlobStorageAuthenticationStrategy implements DescribedValue {
STORAGE_ACCOUNT_KEY("Storage Account Key", "Authenticate to Azure Blob
Storage using the account key."),
- SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate to Azure
Blob Storage using a SAS token.");
+ SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate to Azure
Blob Storage using a SAS token."),
+ OAUTH2("OAuth2", "Authenticate to Azure Blob Storage using an OAuth2
Access Token Provider backed by an Entra registered application.");
private final String displayName;
private final String description;
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 1a7ffc5f67..c6f2c4cf80 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -21,6 +21,9 @@ import
com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
@@ -46,6 +49,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import static
org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,6 +64,7 @@ public class GetAzureEventHubTest {
private static final String POLICY_NAME = "POLICY";
private static final String POLICY_KEY = "POLICY-KEY";
private static final String CONSUMER_GROUP = "$Default";
+ private static final String EVENT_HUB_OAUTH_SERVICE_ID =
"get-event-hub-oauth";
private static final Instant ENQUEUED_TIME = Instant.now();
private static final long SEQUENCE_NUMBER = 32;
private static final String OFFSET = "64";
@@ -156,6 +161,13 @@ public class GetAzureEventHubTest {
testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
}
+ private void configureEventHubOAuthTokenProvider() throws
InitializationException {
+ final MockOAuth2AccessTokenProvider provider = new
MockOAuth2AccessTokenProvider();
+ testRunner.addControllerService(EVENT_HUB_OAUTH_SERVICE_ID, provider);
+ testRunner.enableControllerService(provider);
+
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_OAUTH_SERVICE_ID);
+ }
+
@Test
public void testPropertiesManagedIdentity() {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
@@ -166,6 +178,19 @@ public class GetAzureEventHubTest {
testRunner.assertValid();
}
+ @Test
+ public void testEventHubOAuthRequiresTokenProvider() throws
InitializationException {
+ testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
+ testRunner.setProperty(GetAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(GetAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.OAUTH2.getValue());
+
+ testRunner.assertNotValid();
+
+ configureEventHubOAuthTokenProvider();
+
+ testRunner.assertValid();
+ }
+
@Test
public void testRunNoEventsReceived() {
setProperties();
@@ -265,7 +290,6 @@ public class GetAzureEventHubTest {
}
private static class MockEventData extends EventData {
-
private MockEventData() {
super(CONTENT);
}
@@ -285,4 +309,14 @@ public class GetAzureEventHubTest {
return ENQUEUED_TIME;
}
}
+
+ private static class MockOAuth2AccessTokenProvider extends
AbstractControllerService implements OAuth2AccessTokenProvider {
+ @Override
+ public AccessToken getAccessDetails() {
+ final AccessToken accessToken = new AccessToken();
+ accessToken.setAccessToken("access-token");
+ accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5));
+ return accessToken;
+ }
+ }
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 25a0318e47..777743ab04 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -18,6 +18,9 @@ package org.apache.nifi.processors.azure.eventhub;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.SendOptions;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
@@ -41,6 +44,7 @@ import java.net.Proxy;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static
org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,6 +64,7 @@ public class PutAzureEventHubTest {
private static final String PARTITION_KEY_ATTRIBUTE_NAME =
"eventPartitionKey";
private static final String PARTITION_KEY = "partition";
private static final String CONTENT = String.class.getSimpleName();
+ private static final String EVENT_HUB_OAUTH_SERVICE_ID =
"put-event-hub-oauth";
@Mock
EventHubProducerClient eventHubProducerClient;
@@ -147,6 +152,13 @@ public class PutAzureEventHubTest {
testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
}
+ private void configureEventHubOAuthTokenProvider() throws
InitializationException {
+ final MockOAuth2AccessTokenProvider provider = new
MockOAuth2AccessTokenProvider();
+ testRunner.addControllerService(EVENT_HUB_OAUTH_SERVICE_ID, provider);
+ testRunner.enableControllerService(provider);
+
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_OAUTH_SERVICE_ID);
+ }
+
@Test
public void testPropertiesManagedIdentityEnabled() {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
@@ -157,6 +169,19 @@ public class PutAzureEventHubTest {
testRunner.assertValid();
}
+ @Test
+ public void testEventHubOAuthRequiresTokenProvider() throws
InitializationException {
+ testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
+ testRunner.setProperty(PutAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(PutAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.OAUTH2.getValue());
+
+ testRunner.assertNotValid();
+
+ configureEventHubOAuthTokenProvider();
+
+ testRunner.assertValid();
+ }
+
@Test
public void testRunNoFlowFiles() {
setProperties();
@@ -228,7 +253,6 @@ public class PutAzureEventHubTest {
}
private class MockPutAzureEventHub extends PutAzureEventHub {
-
@Override
protected EventHubProducerClient createEventHubProducerClient(final
ProcessContext context) {
return eventHubProducerClient;
@@ -243,4 +267,14 @@ public class PutAzureEventHubTest {
testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.assertValid();
}
+
+ private static class MockOAuth2AccessTokenProvider extends
AbstractControllerService implements OAuth2AccessTokenProvider {
+ @Override
+ public AccessToken getAccessDetails() {
+ final AccessToken accessToken = new AccessToken();
+ accessToken.setAccessToken("access-token");
+ accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5));
+ return accessToken;
+ }
+ }
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 1a7a41f76b..32aed258e2 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -22,7 +22,10 @@ import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
+import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStrategy;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@@ -68,6 +71,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -100,6 +104,8 @@ public class TestConsumeAzureEventHub {
private static final String FOURTH_CONTENT = "CONTENT-4";
private static final String APPLICATION_PROPERTY = "application";
private static final String APPLICATION_ATTRIBUTE_NAME =
String.format("eventhub.property.%s", APPLICATION_PROPERTY);
+ private static final String EVENT_HUB_OAUTH_SERVICE_ID = "eventHubOauth";
+ private static final String BLOB_OAUTH_SERVICE_ID = "blobOauth";
private static final String EXPECTED_TRANSIT_URI =
String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s",
EVENT_HUB_NAMESPACE,
@@ -198,6 +204,32 @@ public class TestConsumeAzureEventHub {
testRunner.assertNotValid();
}
+ @Test
+ public void
testProcessorConfigValidityWithEventHubOAuthRequiresTokenProvider() throws
InitializationException {
+ testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
+ testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(ConsumeAzureEventHub.CHECKPOINT_STRATEGY,
CheckpointStrategy.COMPONENT_STATE.getValue());
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.OAUTH2.getValue());
+ testRunner.assertNotValid();
+
+ configureEventHubOAuthTokenProvider();
+
+ testRunner.assertValid();
+ }
+
+ @Test
+ public void
testProcessorConfigValidityWithBlobOAuthRequiresTokenProvider() throws
InitializationException {
+ testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
+ testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME,
STORAGE_ACCOUNT_NAME);
+
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.OAUTH2.getValue());
+ testRunner.assertNotValid();
+
+ configureBlobOAuthTokenProvider();
+
+ testRunner.assertValid();
+ }
+
@Test
public void testProcessorConfigValidityWithTokenSet() throws
InitializationException {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
@@ -627,8 +659,21 @@ public class TestConsumeAzureEventHub {
testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
}
- private class MockConsumeAzureEventHub extends ConsumeAzureEventHub {
+ private void configureEventHubOAuthTokenProvider() throws
InitializationException {
+ final MockOAuth2AccessTokenProvider provider = new
MockOAuth2AccessTokenProvider();
+ testRunner.addControllerService(EVENT_HUB_OAUTH_SERVICE_ID, provider);
+ testRunner.enableControllerService(provider);
+
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
EVENT_HUB_OAUTH_SERVICE_ID);
+ }
+ private void configureBlobOAuthTokenProvider() throws
InitializationException {
+ final MockOAuth2AccessTokenProvider provider = new
MockOAuth2AccessTokenProvider();
+ testRunner.addControllerService(BLOB_OAUTH_SERVICE_ID, provider);
+ testRunner.enableControllerService(provider);
+
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER,
BLOB_OAUTH_SERVICE_ID);
+ }
+
+ private class MockConsumeAzureEventHub extends ConsumeAzureEventHub {
@Override
protected EventProcessorClient createClient(final ProcessContext
context) {
return eventProcessorClient;
@@ -639,4 +684,14 @@ public class TestConsumeAzureEventHub {
return EXPECTED_TRANSIT_URI;
}
}
+
+ private static class MockOAuth2AccessTokenProvider extends
AbstractControllerService implements OAuth2AccessTokenProvider {
+ @Override
+ public AccessToken getAccessDetails() {
+ final AccessToken accessToken = new AccessToken();
+ accessToken.setAccessToken("access-token");
+ accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5));
+ return accessToken;
+ }
+ }
}