This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bdcbec9d6c NIFI-15185 Added OAuth support for Azure Event Hubs 
Processors (#10498)
bdcbec9d6c is described below

commit bdcbec9d6c6c049204f384c2677700aebb17892c
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Nov 25 17:58:51 2025 +0100

    NIFI-15185 Added OAuth support for Azure Event Hubs Processors (#10498)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-azure-processors/pom.xml                  |   4 +
 .../azure/eventhub/ConsumeAzureEventHub.java       | 172 +++++++++++++++------
 .../azure/eventhub/GetAzureEventHub.java           |  36 +++--
 .../azure/eventhub/PutAzureEventHub.java           |  35 +++--
 .../azure/eventhub/utils/AzureEventHubUtils.java   | 121 ++++++++++++---
 .../AzureEventHubAuthenticationStrategy.java       |   3 +-
 .../azure/eventhubs/AzureEventHubComponent.java    |   9 ++
 .../BlobStorageAuthenticationStrategy.java         |   3 +-
 .../azure/eventhub/GetAzureEventHubTest.java       |  36 ++++-
 .../azure/eventhub/PutAzureEventHubTest.java       |  36 ++++-
 .../azure/eventhub/TestConsumeAzureEventHub.java   |  57 ++++++-
 11 files changed, 425 insertions(+), 87 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index ae86d16cdb..ad003cb5ec 100644
--- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -52,6 +52,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-proxy-configuration-api</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 2d9811198d..d9e07843a7 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -21,6 +21,7 @@ import com.azure.core.amqp.AmqpTransportType;
 import com.azure.core.amqp.exception.AmqpErrorCondition;
 import com.azure.core.amqp.exception.AmqpException;
 import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.core.credential.TokenCredential;
 import com.azure.core.http.ProxyOptions;
 import com.azure.core.util.HttpClientOptions;
 import com.azure.identity.ManagedIdentityCredential;
@@ -58,6 +59,7 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -147,6 +149,7 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
             .build();
     static final PropertyDescriptor SERVICE_BUS_ENDPOINT = 
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
     static final PropertyDescriptor AUTHENTICATION_STRATEGY = 
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
+    static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = 
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
     static final PropertyDescriptor ACCESS_POLICY_NAME = new 
PropertyDescriptor.Builder()
             .name("Shared Access Policy Name")
             .description("The name of the shared access policy. This policy 
must have Listen claims.")
@@ -257,6 +260,13 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
             .required(true)
             .dependsOn(CHECKPOINT_STRATEGY, 
CheckpointStrategy.AZURE_BLOB_STORAGE)
             .build();
+    static final PropertyDescriptor BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER 
= new PropertyDescriptor.Builder()
+            .name("Storage Access Token Provider")
+            .description("Controller Service providing OAuth2 Access Tokens 
for authenticating to Azure Blob Storage when persisting checkpoints.")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, 
BlobStorageAuthenticationStrategy.OAUTH2)
+            .build();
     static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new 
PropertyDescriptor.Builder()
             .name("Storage Account Key")
             .description("The Azure Storage account key to store event hub 
consumer group state.")
@@ -315,6 +325,7 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
             ACCESS_POLICY_NAME,
             POLICY_PRIMARY_KEY,
             AUTHENTICATION_STRATEGY,
+            EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
             CONSUMER_GROUP,
             RECORD_READER,
             RECORD_WRITER,
@@ -328,6 +339,7 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
             BLOB_STORAGE_AUTHENTICATION_STRATEGY,
             STORAGE_ACCOUNT_KEY,
             STORAGE_SAS_TOKEN,
+            BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER,
             PROXY_CONFIGURATION_SERVICE
     );
 
@@ -422,12 +434,13 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
         final String storageAccountKey = 
validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
         final String storageSasToken = 
validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
         final CheckpointStrategy checkpointStrategy = 
CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue());
+        final boolean blobOauthProviderSet = 
validationContext.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).isSet();
 
         if ((recordReader != null && recordWriter == null) || (recordReader == 
null && recordWriter != null)) {
             results.add(new ValidationResult.Builder()
                     .subject("Record Reader and Writer")
-                    .explanation(String.format("Both %s and %s should be set 
in order to write FlowFiles as Records.",
-                            RECORD_READER.getDisplayName(), 
RECORD_WRITER.getDisplayName()))
+                    .explanation("Both %s and %s should be set in order to 
write FlowFiles as Records."
+                            .formatted(RECORD_READER.getDisplayName(), 
RECORD_WRITER.getDisplayName()))
                     .valid(false)
                     .build());
         }
@@ -441,10 +454,10 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
                 if (StringUtils.isBlank(storageAccountKey)) {
                     results.add(new ValidationResult.Builder()
                             .subject(STORAGE_ACCOUNT_KEY.getDisplayName())
-                            .explanation(String.format("%s must be set when %s 
is %s.",
-                                    STORAGE_ACCOUNT_KEY.getDisplayName(),
-                                    
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-                                    
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
+                            .explanation("%s must be set when %s is %s."
+                                    
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
+                                            
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+                                            
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
                             .valid(false)
                             .build());
                 }
@@ -452,10 +465,10 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
                 if (StringUtils.isNotBlank(storageSasToken)) {
                     results.add(new ValidationResult.Builder()
                             .subject(STORAGE_SAS_TOKEN.getDisplayName())
-                            .explanation(String.format("%s must not be set 
when %s is %s.",
-                                    STORAGE_SAS_TOKEN.getDisplayName(),
-                                    
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-                                    
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
+                            .explanation("%s must not be set when %s is %s."
+                                    
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
+                                            
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+                                            
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
                             .valid(false)
                             .build());
                 }
@@ -463,10 +476,32 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
                 if (StringUtils.isBlank(storageSasToken)) {
                     results.add(new ValidationResult.Builder()
                             .subject(STORAGE_SAS_TOKEN.getDisplayName())
-                            .explanation(String.format("%s must be set when %s 
is %s.",
-                                    STORAGE_SAS_TOKEN.getDisplayName(),
-                                    
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-                                    
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+                            .explanation("%s must be set when %s is %s."
+                                    
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
+                                            
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+                                            
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+                            .valid(false)
+                            .build());
+                }
+
+                if (StringUtils.isNotBlank(storageAccountKey)) {
+                    results.add(new ValidationResult.Builder()
+                            .subject(STORAGE_ACCOUNT_KEY.getDisplayName())
+                            .explanation("%s must not be set when %s is %s."
+                                    
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
+                                            
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+                                            
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+                            .valid(false)
+                            .build());
+                }
+            } else if (blobStorageAuthenticationStrategy == 
BlobStorageAuthenticationStrategy.OAUTH2) {
+                if (!blobOauthProviderSet) {
+                    results.add(new ValidationResult.Builder()
+                            
.subject(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName())
+                            .explanation("%s must be set when %s is %s."
+                                    
.formatted(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName(),
+                                            
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+                                            
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
                             .valid(false)
                             .build());
                 }
@@ -474,16 +509,27 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
                 if (StringUtils.isNotBlank(storageAccountKey)) {
                     results.add(new ValidationResult.Builder()
                             .subject(STORAGE_ACCOUNT_KEY.getDisplayName())
-                            .explanation(String.format("%s must not be set 
when %s is %s.",
-                                    STORAGE_ACCOUNT_KEY.getDisplayName(),
-                                    
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
-                                    
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+                            .explanation("%s must not be set when %s is %s."
+                                    
.formatted(STORAGE_ACCOUNT_KEY.getDisplayName(),
+                                            
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+                                            
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
+                            .valid(false)
+                            .build());
+                }
+
+                if (StringUtils.isNotBlank(storageSasToken)) {
+                    results.add(new ValidationResult.Builder()
+                            .subject(STORAGE_SAS_TOKEN.getDisplayName())
+                            .explanation("%s must not be set when %s is %s."
+                                    
.formatted(STORAGE_SAS_TOKEN.getDisplayName(),
+                                            
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+                                            
BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName()))
                             .valid(false)
                             .build());
                 }
             }
         }
-        results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, 
POLICY_PRIMARY_KEY, validationContext));
+        results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, 
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, validationContext));
         return results;
     }
 
@@ -561,10 +607,29 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
 
         if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
             final String containerName = 
defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(),
 eventHubName);
-            final String storageConnectionString = 
createStorageConnectionString(context);
-            final BlobContainerClientBuilder blobContainerClientBuilder = new 
BlobContainerClientBuilder()
-                    .connectionString(storageConnectionString)
-                    .containerName(containerName);
+            final String storageAccountName = 
context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+            final String domainName = getStorageDomainName(serviceBusEndpoint);
+            final BlobStorageAuthenticationStrategy 
blobStorageAuthenticationStrategy =
+                    
context.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);
+
+            final BlobContainerClientBuilder blobContainerClientBuilder = new 
BlobContainerClientBuilder();
+
+            switch (blobStorageAuthenticationStrategy) {
+                case STORAGE_ACCOUNT_KEY, SHARED_ACCESS_SIGNATURE -> {
+                    final String storageConnectionString = 
createStorageConnectionString(context, blobStorageAuthenticationStrategy, 
storageAccountName, domainName);
+                    
blobContainerClientBuilder.connectionString(storageConnectionString);
+                }
+                case OAUTH2 -> {
+                    final OAuth2AccessTokenProvider tokenProvider =
+                            
context.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+                    final TokenCredential tokenCredential = 
AzureEventHubUtils.createTokenCredential(tokenProvider);
+                    final String endpoint = 
createBlobEndpoint(storageAccountName, domainName);
+                    blobContainerClientBuilder.endpoint(endpoint);
+                    blobContainerClientBuilder.credential(tokenCredential);
+                }
+            }
+            blobContainerClientBuilder.containerName(containerName);
+
             final ProxyOptions storageProxyOptions = 
AzureStorageUtils.getProxyOptions(context);
             if (storageProxyOptions != null) {
                 blobContainerClientBuilder.clientOptions(new 
HttpClientOptions().setProxyOptions(storageProxyOptions));
@@ -593,17 +658,30 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
                 .processError(errorProcessor)
                 .processEventBatch(eventBatchProcessor, maxBatchSize, 
maxWaitTime);
 
-        final AzureEventHubAuthenticationStrategy authenticationStrategy =
+        final AzureEventHubAuthenticationStrategy 
configuredAuthenticationStrategy =
                 
context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class);
-        if (authenticationStrategy == null || authenticationStrategy == 
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
-            final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
-            final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
-            eventProcessorClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
-        } else {
-            final String policyName = 
context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
-            final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
-            final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
-            eventProcessorClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
+        final AzureEventHubAuthenticationStrategy authenticationStrategy = 
configuredAuthenticationStrategy == null
+                ? AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY
+                : configuredAuthenticationStrategy;
+
+        switch (authenticationStrategy) {
+            case MANAGED_IDENTITY -> {
+                final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+                final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
+                
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, 
managedIdentityCredential);
+            }
+            case SHARED_ACCESS_SIGNATURE -> {
+                final String policyName = 
context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
+                final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+                final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
+                
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, 
azureNamedKeyCredential);
+            }
+            case OAUTH2 -> {
+                final OAuth2AccessTokenProvider tokenProvider =
+                        
context.getProperty(EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+                final TokenCredential tokenCredential = 
AzureEventHubUtils.createTokenCredential(tokenProvider);
+                
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, 
tokenCredential);
+            }
         }
 
         final Integer prefetchCount = 
context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
@@ -832,20 +910,28 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
         session.getProvenanceReporter().receive(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
     }
 
-    private String createStorageConnectionString(final ProcessContext context) 
{
-        final String storageAccountName = 
context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
-        final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
-        final String domainName = serviceBusEndpoint.replace(".servicebus.", 
"");
+    private String createStorageConnectionString(final ProcessContext context,
+                                                 final 
BlobStorageAuthenticationStrategy blobStorageAuthenticationStrategy,
+                                                 final String 
storageAccountName,
+                                                 final String domainName) {
         final String storageAccountKey = 
context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
         final String storageSasToken = 
context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
-        final BlobStorageAuthenticationStrategy 
blobStorageAuthenticationStrategy =
-                
context.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);
+        return switch (blobStorageAuthenticationStrategy) {
+            case STORAGE_ACCOUNT_KEY ->
+                    
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, 
storageAccountName, storageAccountKey, domainName);
+            case SHARED_ACCESS_SIGNATURE ->
+                    
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, 
storageAccountName, domainName, storageSasToken);
+            case OAUTH2 -> throw new IllegalArgumentException(String.format(
+                    "Blob Storage Authentication Strategy %s does not support 
connection string authentication", blobStorageAuthenticationStrategy));
+        };
+    }
 
-        if (blobStorageAuthenticationStrategy == 
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) {
-            return 
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, 
storageAccountName, storageAccountKey, domainName);
-        }
+    private String createBlobEndpoint(final String storageAccountName, final 
String domainName) {
+        return String.format("https://%s.blob.core.%s/";, storageAccountName, 
domainName);
+    }
 
-        return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, 
storageAccountName, domainName, storageSasToken);
+    private String getStorageDomainName(final String serviceBusEndpoint) {
+        return serviceBusEndpoint.replace(".servicebus.", "");
     }
 
     private Map<String, EventPosition> getLegacyPartitionEventPosition(
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 1e31f383e2..d8dc8b4611 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.eventhub;
 import com.azure.core.amqp.AmqpClientOptions;
 import com.azure.core.amqp.AmqpTransportType;
 import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.core.credential.TokenCredential;
 import com.azure.identity.ManagedIdentityCredential;
 import com.azure.identity.ManagedIdentityCredentialBuilder;
 import com.azure.messaging.eventhubs.EventData;
@@ -58,6 +59,7 @@ import org.apache.nifi.scheduling.ExecutionNode;
 import 
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
 import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
 import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.util.StopWatch;
 
 import java.time.Duration;
@@ -113,6 +115,7 @@ public class GetAzureEventHub extends AbstractProcessor 
implements AzureEventHub
             .build();
     static final PropertyDescriptor SERVICE_BUS_ENDPOINT = 
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
     static final PropertyDescriptor AUTHENTICATION_STRATEGY = 
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
+    static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = 
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
     static final PropertyDescriptor ACCESS_POLICY = new 
PropertyDescriptor.Builder()
             .name("Shared Access Policy Name")
             .description("The name of the shared access policy. This policy 
must have Listen claims.")
@@ -168,6 +171,7 @@ public class GetAzureEventHub extends AbstractProcessor 
implements AzureEventHub
             ACCESS_POLICY,
             POLICY_PRIMARY_KEY,
             AUTHENTICATION_STRATEGY,
+            EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
             CONSUMER_GROUP,
             ENQUEUE_TIME,
             RECEIVER_FETCH_SIZE,
@@ -205,7 +209,7 @@ public class GetAzureEventHub extends AbstractProcessor 
implements AzureEventHub
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
-        return AzureEventHubUtils.customValidate(ACCESS_POLICY, 
POLICY_PRIMARY_KEY, context);
+        return AzureEventHubUtils.customValidate(ACCESS_POLICY, 
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
     }
 
     @OnPrimaryNodeStateChange
@@ -410,15 +414,27 @@ public class GetAzureEventHub extends AbstractProcessor 
implements AzureEventHub
         final String consumerGroup = 
context.getProperty(CONSUMER_GROUP).getValue();
         eventHubClientBuilder.consumerGroup(consumerGroup);
 
-        if (authenticationStrategy == null || authenticationStrategy == 
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
-            final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
-            final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
-            eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
-        } else {
-            final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
-            final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
-            final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
-            eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
+        final AzureEventHubAuthenticationStrategy 
resolvedAuthenticationStrategy =
+                authenticationStrategy == null ? 
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY : authenticationStrategy;
+
+        switch (resolvedAuthenticationStrategy) {
+            case MANAGED_IDENTITY -> {
+                final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+                final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
+            }
+            case SHARED_ACCESS_SIGNATURE -> {
+                final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
+                final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+                final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
+            }
+            case OAUTH2 -> {
+                final OAuth2AccessTokenProvider tokenProvider =
+                        
context.getProperty(EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+                final TokenCredential tokenCredential = 
AzureEventHubUtils.createTokenCredential(tokenProvider);
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, tokenCredential);
+            }
         }
 
         // Set Azure Event Hub Client Identifier using Processor Identifier 
instead of default random UUID
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index 991b19428c..d34b3a872d 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.eventhub;
 
 import com.azure.core.amqp.AmqpTransportType;
 import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.core.credential.TokenCredential;
 import com.azure.identity.ManagedIdentityCredential;
 import com.azure.identity.ManagedIdentityCredentialBuilder;
 import com.azure.messaging.eventhubs.EventData;
@@ -40,6 +41,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -86,6 +88,7 @@ public class PutAzureEventHub extends AbstractProcessor 
implements AzureEventHub
             .build();
     static final PropertyDescriptor SERVICE_BUS_ENDPOINT = 
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
     static final PropertyDescriptor AUTHENTICATION_STRATEGY = 
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
+    static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = 
AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER;
     static final PropertyDescriptor ACCESS_POLICY = new 
PropertyDescriptor.Builder()
             .name("Shared Access Policy Name")
             .description("The name of the shared access policy. This policy 
must have Send claims.")
@@ -129,6 +132,7 @@ public class PutAzureEventHub extends AbstractProcessor 
implements AzureEventHub
             ACCESS_POLICY,
             POLICY_PRIMARY_KEY,
             AUTHENTICATION_STRATEGY,
+            EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
             PARTITIONING_KEY_ATTRIBUTE_NAME,
             MAX_BATCH_SIZE,
             PROXY_CONFIGURATION_SERVICE
@@ -167,7 +171,7 @@ public class PutAzureEventHub extends AbstractProcessor 
implements AzureEventHub
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
-        return AzureEventHubUtils.customValidate(ACCESS_POLICY, 
POLICY_PRIMARY_KEY, context);
+        return AzureEventHubUtils.customValidate(ACCESS_POLICY, 
POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context);
     }
 
     @Override
@@ -235,15 +239,26 @@ public class PutAzureEventHub extends AbstractProcessor 
implements AzureEventHub
             eventHubClientBuilder.transportType(transportType);
 
             final String fullyQualifiedNamespace = String.format("%s%s", 
namespace, serviceBusEndpoint);
-            if (authenticationStrategy == null || authenticationStrategy == 
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
-                final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
-                final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
-                eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
-            } else {
-                final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
-                final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
-                final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
-                eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
+            final AzureEventHubAuthenticationStrategy 
resolvedAuthenticationStrategy =
+                    authenticationStrategy == null ? 
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY : authenticationStrategy;
+            switch (resolvedAuthenticationStrategy) {
+                case MANAGED_IDENTITY -> {
+                    final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+                    final ManagedIdentityCredential managedIdentityCredential 
= managedIdentityCredentialBuilder.build();
+                    eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
+                }
+                case SHARED_ACCESS_SIGNATURE -> {
+                    final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
+                    final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+                    final AzureNamedKeyCredential azureNamedKeyCredential = 
new AzureNamedKeyCredential(policyName, policyKey);
+                    eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
+                }
+                case OAUTH2 -> {
+                    final OAuth2AccessTokenProvider tokenProvider =
+                            
context.getProperty(EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+                    final TokenCredential tokenCredential = 
AzureEventHubUtils.createTokenCredential(tokenProvider);
+                    eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, tokenCredential);
+                }
             }
             
AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions);
             return eventHubClientBuilder.buildProducerClient();
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
index 0c3f657b18..566d08b056 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -18,25 +18,33 @@ package org.apache.nifi.processors.azure.eventhub.utils;
 
 import com.azure.core.amqp.ProxyAuthenticationType;
 import com.azure.core.amqp.ProxyOptions;
+import com.azure.core.credential.TokenCredential;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import 
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
 import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
 import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
+import reactor.core.publisher.Mono;
 
 import java.net.InetSocketAddress;
 import java.net.Proxy;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 public final class AzureEventHubUtils {
 
@@ -68,8 +76,11 @@ public final class AzureEventHubUtils {
             .required(true)
             .build();
 
+    private static final long DEFAULT_TOKEN_LIFETIME_SECONDS = 
TimeUnit.MINUTES.toSeconds(5);
+
     public static List<ValidationResult> customValidate(PropertyDescriptor 
accessPolicyDescriptor,
                                                         PropertyDescriptor 
policyKeyDescriptor,
+                                                        PropertyDescriptor 
tokenProviderDescriptor,
                                                         ValidationContext 
context) {
         List<ValidationResult> validationResults = new ArrayList<>();
 
@@ -79,25 +90,75 @@ public final class AzureEventHubUtils {
                 
context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY)
                         
.asAllowableValue(AzureEventHubAuthenticationStrategy.class))
                 .orElse(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY);
-
-        if (authenticationStrategy == 
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY && (accessPolicyIsSet || 
policyKeyIsSet)) {
-            final String msg = String.format(
-                    "When '%s' is set to '%s', '%s' and '%s' must not be set.",
-                    
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-                    
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName(),
-                    accessPolicyDescriptor.getDisplayName(),
-                    POLICY_PRIMARY_KEY.getDisplayName()
-            );
-            validationResults.add(new 
ValidationResult.Builder().subject("Credentials 
config").valid(false).explanation(msg).build());
-        } else if (authenticationStrategy == 
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE && 
(!accessPolicyIsSet || !policyKeyIsSet)) {
-            final String msg = String.format(
-                    "When '%s' is set to '%s', both '%s' and '%s' must be set",
-                    
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
-                    
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName(),
-                    accessPolicyDescriptor.getDisplayName(),
-                    POLICY_PRIMARY_KEY.getDisplayName()
-            );
-            validationResults.add(new 
ValidationResult.Builder().subject("Credentials 
config").valid(false).explanation(msg).build());
+        final boolean tokenProviderIsSet = tokenProviderDescriptor != null && 
context.getProperty(tokenProviderDescriptor).isSet();
+
+        switch (authenticationStrategy) {
+            case MANAGED_IDENTITY -> {
+                if (accessPolicyIsSet || policyKeyIsSet) {
+                    final String msg = String.format(
+                            "When '%s' is set to '%s', '%s' and '%s' must not 
be set.",
+                            
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+                            
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName(),
+                            accessPolicyDescriptor.getDisplayName(),
+                            policyKeyDescriptor.getDisplayName()
+                    );
+                    validationResults.add(new 
ValidationResult.Builder().subject("Credentials 
config").valid(false).explanation(msg).build());
+                }
+                if (tokenProviderIsSet) {
+                    validationResults.add(new ValidationResult.Builder()
+                            
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
+                            .valid(false)
+                            .explanation(String.format("'%s' must not be set 
when '%s' is '%s'.",
+                                    tokenProviderDescriptor.getDisplayName(),
+                                    
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+                                    
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName()))
+                            .build());
+                }
+            }
+            case SHARED_ACCESS_SIGNATURE -> {
+                if (!accessPolicyIsSet || !policyKeyIsSet) {
+                    final String msg = String.format(
+                            "When '%s' is set to '%s', both '%s' and '%s' must 
be set",
+                            
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+                            
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName(),
+                            accessPolicyDescriptor.getDisplayName(),
+                            policyKeyDescriptor.getDisplayName()
+                    );
+                    validationResults.add(new 
ValidationResult.Builder().subject("Credentials 
config").valid(false).explanation(msg).build());
+                }
+                if (tokenProviderIsSet) {
+                    validationResults.add(new ValidationResult.Builder()
+                            
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
+                            .valid(false)
+                            .explanation(String.format("'%s' must not be set 
when '%s' is '%s'.",
+                                    tokenProviderDescriptor.getDisplayName(),
+                                    
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+                                    
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+                            .build());
+                }
+            }
+            case OAUTH2 -> {
+                if (accessPolicyIsSet || policyKeyIsSet) {
+                    final String msg = String.format(
+                            "When '%s' is set to '%s', '%s' and '%s' must not 
be set.",
+                            
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+                            
AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName(),
+                            accessPolicyDescriptor.getDisplayName(),
+                            policyKeyDescriptor.getDisplayName()
+                    );
+                    validationResults.add(new 
ValidationResult.Builder().subject("Credentials 
config").valid(false).explanation(msg).build());
+                }
+                if (!tokenProviderIsSet) {
+                    validationResults.add(new ValidationResult.Builder()
+                            
.subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName())
+                            .valid(false)
+                            .explanation(String.format("'%s' must be set when 
'%s' is '%s'.",
+                                    tokenProviderDescriptor.getDisplayName(),
+                                    
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+                                    
AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName()))
+                            .build());
+                }
+            }
         }
         ProxyConfiguration.validateProxySpec(context, validationResults, 
AzureEventHubComponent.PROXY_SPECS);
         return validationResults;
@@ -150,6 +211,28 @@ public final class AzureEventHubUtils {
         return Optional.ofNullable(proxyOptions);
     }
 
+    public static TokenCredential createTokenCredential(final 
OAuth2AccessTokenProvider tokenProvider) {
+        Objects.requireNonNull(tokenProvider, "OAuth2 Access Token Provider is 
required");
+
+        return tokenRequestContext -> Mono.fromSupplier(() -> {
+            final org.apache.nifi.oauth2.AccessToken accessDetails = 
tokenProvider.getAccessDetails();
+            final String accessToken = accessDetails.getAccessToken();
+
+            if (accessToken == null || accessToken.isBlank()) {
+                throw new IllegalStateException("OAuth2 Access Token Provider 
returned an empty access token");
+            }
+
+            final Instant fetchTime = accessDetails.getFetchTime();
+            final long expiresInSeconds = accessDetails.getExpiresIn();
+            final Instant expirationInstant = expiresInSeconds > 0
+                    ? fetchTime.plusSeconds(expiresInSeconds)
+                    : fetchTime.plusSeconds(DEFAULT_TOKEN_LIFETIME_SECONDS);
+            final OffsetDateTime expiresAt = 
OffsetDateTime.ofInstant(expirationInstant, ZoneOffset.UTC);
+
+            return new com.azure.core.credential.AccessToken(accessToken, 
expiresAt);
+        });
+    }
+
     private static Proxy getProxy(ProxyConfiguration proxyConfiguration) {
         final Proxy.Type type;
         if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
index 654871914c..88398e82ec 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
@@ -23,7 +23,8 @@ import org.apache.nifi.components.DescribedValue;
  */
 public enum AzureEventHubAuthenticationStrategy implements DescribedValue {
     MANAGED_IDENTITY("Managed Identity", "Authenticate using the Managed 
Identity of the hosting Azure resource."),
-    SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate using the 
Shared Access Policy name and key.");
+    SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate using the 
Shared Access Policy name and key."),
+    OAUTH2("OAuth2", "Authenticate using an OAuth2 Access Token Provider 
backed by an Entra registered application.");
 
     private final String displayName;
     private final String description;
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
index 4da423cb3a..caf503f024 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
@@ -18,6 +18,7 @@ package org.apache.nifi.shared.azure.eventhubs;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxySpec;
@@ -43,6 +44,14 @@ public interface AzureEventHubComponent {
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
+    PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("Event Hubs Access Token Provider")
+            .description("Controller Service providing OAuth2 Access Tokens 
for authenticating to Azure Event Hubs")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(AUTHENTICATION_STRATEGY, 
AzureEventHubAuthenticationStrategy.OAUTH2)
+            .build();
     ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.HTTP_AUTH};
     PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new 
PropertyDescriptor.Builder()
             
.fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(PROXY_SPECS))
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
index 32c945bc96..3ab03cc3f6 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
@@ -20,7 +20,8 @@ import org.apache.nifi.components.DescribedValue;
 
 public enum BlobStorageAuthenticationStrategy implements DescribedValue {
     STORAGE_ACCOUNT_KEY("Storage Account Key", "Authenticate to Azure Blob 
Storage using the account key."),
-    SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate to Azure 
Blob Storage using a SAS token.");
+    SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate to Azure 
Blob Storage using a SAS token."),
+    OAUTH2("OAuth2", "Authenticate to Azure Blob Storage using an OAuth2 
Access Token Provider backed by an Entra registered application.");
 
     private final String displayName;
     private final String description;
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 1a7ffc5f67..c6f2c4cf80 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -21,6 +21,9 @@ import 
com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
 import com.azure.messaging.eventhubs.models.PartitionContext;
 import com.azure.messaging.eventhubs.models.PartitionEvent;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
 import org.apache.nifi.proxy.ProxyConfiguration;
@@ -46,6 +49,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,6 +64,7 @@ public class GetAzureEventHubTest {
     private static final String POLICY_NAME = "POLICY";
     private static final String POLICY_KEY = "POLICY-KEY";
     private static final String CONSUMER_GROUP = "$Default";
+    private static final String EVENT_HUB_OAUTH_SERVICE_ID = 
"get-event-hub-oauth";
     private static final Instant ENQUEUED_TIME = Instant.now();
     private static final long SEQUENCE_NUMBER = 32;
     private static final String OFFSET = "64";
@@ -156,6 +161,13 @@ public class GetAzureEventHubTest {
         testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
     }
 
+    private void configureEventHubOAuthTokenProvider() throws 
InitializationException {
+        final MockOAuth2AccessTokenProvider provider = new 
MockOAuth2AccessTokenProvider();
+        testRunner.addControllerService(EVENT_HUB_OAUTH_SERVICE_ID, provider);
+        testRunner.enableControllerService(provider);
+        
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, 
EVENT_HUB_OAUTH_SERVICE_ID);
+    }
+
     @Test
     public void testPropertiesManagedIdentity() {
         testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME, 
EVENT_HUB_NAME);
@@ -166,6 +178,19 @@ public class GetAzureEventHubTest {
         testRunner.assertValid();
     }
 
+    @Test
+    public void testEventHubOAuthRequiresTokenProvider() throws 
InitializationException {
+        testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME, 
EVENT_HUB_NAME);
+        testRunner.setProperty(GetAzureEventHub.NAMESPACE, 
EVENT_HUB_NAMESPACE);
+        testRunner.setProperty(GetAzureEventHub.AUTHENTICATION_STRATEGY, 
AzureEventHubAuthenticationStrategy.OAUTH2.getValue());
+
+        testRunner.assertNotValid();
+
+        configureEventHubOAuthTokenProvider();
+
+        testRunner.assertValid();
+    }
+
     @Test
     public void testRunNoEventsReceived() {
         setProperties();
@@ -265,7 +290,6 @@ public class GetAzureEventHubTest {
     }
 
     private static class MockEventData extends EventData {
-
         private MockEventData() {
             super(CONTENT);
         }
@@ -285,4 +309,14 @@ public class GetAzureEventHubTest {
             return ENQUEUED_TIME;
         }
     }
+
+    private static class MockOAuth2AccessTokenProvider extends 
AbstractControllerService implements OAuth2AccessTokenProvider {
+        @Override
+        public AccessToken getAccessDetails() {
+            final AccessToken accessToken = new AccessToken();
+            accessToken.setAccessToken("access-token");
+            accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5));
+            return accessToken;
+        }
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 25a0318e47..777743ab04 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -18,6 +18,9 @@ package org.apache.nifi.processors.azure.eventhub;
 
 import com.azure.messaging.eventhubs.EventHubProducerClient;
 import com.azure.messaging.eventhubs.models.SendOptions;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
 import org.apache.nifi.proxy.ProxyConfiguration;
@@ -41,6 +44,7 @@ import java.net.Proxy;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,6 +64,7 @@ public class PutAzureEventHubTest {
     private static final String PARTITION_KEY_ATTRIBUTE_NAME = 
"eventPartitionKey";
     private static final String PARTITION_KEY = "partition";
     private static final String CONTENT = String.class.getSimpleName();
+    private static final String EVENT_HUB_OAUTH_SERVICE_ID = 
"put-event-hub-oauth";
 
     @Mock
     EventHubProducerClient eventHubProducerClient;
@@ -147,6 +152,13 @@ public class PutAzureEventHubTest {
         testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
     }
 
+    private void configureEventHubOAuthTokenProvider() throws 
InitializationException {
+        final MockOAuth2AccessTokenProvider provider = new 
MockOAuth2AccessTokenProvider();
+        testRunner.addControllerService(EVENT_HUB_OAUTH_SERVICE_ID, provider);
+        testRunner.enableControllerService(provider);
+        
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, 
EVENT_HUB_OAUTH_SERVICE_ID);
+    }
+
     @Test
     public void testPropertiesManagedIdentityEnabled() {
         testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME, 
EVENT_HUB_NAME);
@@ -157,6 +169,19 @@ public class PutAzureEventHubTest {
         testRunner.assertValid();
     }
 
+    @Test
+    public void testEventHubOAuthRequiresTokenProvider() throws 
InitializationException {
+        testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME, 
EVENT_HUB_NAME);
+        testRunner.setProperty(PutAzureEventHub.NAMESPACE, 
EVENT_HUB_NAMESPACE);
+        testRunner.setProperty(PutAzureEventHub.AUTHENTICATION_STRATEGY, 
AzureEventHubAuthenticationStrategy.OAUTH2.getValue());
+
+        testRunner.assertNotValid();
+
+        configureEventHubOAuthTokenProvider();
+
+        testRunner.assertValid();
+    }
+
     @Test
     public void testRunNoFlowFiles() {
         setProperties();
@@ -228,7 +253,6 @@ public class PutAzureEventHubTest {
     }
 
     private class MockPutAzureEventHub extends PutAzureEventHub {
-
         @Override
         protected EventHubProducerClient createEventHubProducerClient(final 
ProcessContext context) {
             return eventHubProducerClient;
@@ -243,4 +267,14 @@ public class PutAzureEventHubTest {
         testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY, 
POLICY_KEY);
         testRunner.assertValid();
     }
+
+    private static class MockOAuth2AccessTokenProvider extends 
AbstractControllerService implements OAuth2AccessTokenProvider {
+        @Override
+        public AccessToken getAccessDetails() {
+            final AccessToken accessToken = new AccessToken();
+            accessToken.setAccessToken("access-token");
+            accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5));
+            return accessToken;
+        }
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 1a7a41f76b..32aed258e2 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -22,7 +22,10 @@ import com.azure.messaging.eventhubs.EventProcessorClient;
 import com.azure.messaging.eventhubs.models.Checkpoint;
 import com.azure.messaging.eventhubs.models.EventBatchContext;
 import com.azure.messaging.eventhubs.models.PartitionContext;
+import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStrategy;
 import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@@ -68,6 +71,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -100,6 +104,8 @@ public class TestConsumeAzureEventHub {
     private static final String FOURTH_CONTENT = "CONTENT-4";
     private static final String APPLICATION_PROPERTY = "application";
     private static final String APPLICATION_ATTRIBUTE_NAME = 
String.format("eventhub.property.%s", APPLICATION_PROPERTY);
+    private static final String EVENT_HUB_OAUTH_SERVICE_ID = "eventHubOauth";
+    private static final String BLOB_OAUTH_SERVICE_ID = "blobOauth";
 
     private static final String EXPECTED_TRANSIT_URI = 
String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s",
             EVENT_HUB_NAMESPACE,
@@ -198,6 +204,32 @@ public class TestConsumeAzureEventHub {
         testRunner.assertNotValid();
     }
 
+    @Test
+    public void 
testProcessorConfigValidityWithEventHubOAuthRequiresTokenProvider() throws 
InitializationException {
+        testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, 
EVENT_HUB_NAME);
+        testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, 
EVENT_HUB_NAMESPACE);
+        testRunner.setProperty(ConsumeAzureEventHub.CHECKPOINT_STRATEGY, 
CheckpointStrategy.COMPONENT_STATE.getValue());
+        testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY, 
AzureEventHubAuthenticationStrategy.OAUTH2.getValue());
+        testRunner.assertNotValid();
+
+        configureEventHubOAuthTokenProvider();
+
+        testRunner.assertValid();
+    }
+
+    @Test
+    public void 
testProcessorConfigValidityWithBlobOAuthRequiresTokenProvider() throws 
InitializationException {
+        testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, 
EVENT_HUB_NAME);
+        testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, 
EVENT_HUB_NAMESPACE);
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, 
STORAGE_ACCOUNT_NAME);
+        
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY,
 BlobStorageAuthenticationStrategy.OAUTH2.getValue());
+        testRunner.assertNotValid();
+
+        configureBlobOAuthTokenProvider();
+
+        testRunner.assertValid();
+    }
+
     @Test
     public void testProcessorConfigValidityWithTokenSet() throws 
InitializationException {
         testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, 
EVENT_HUB_NAME);
@@ -627,8 +659,21 @@ public class TestConsumeAzureEventHub {
         testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
     }
 
-    private class MockConsumeAzureEventHub extends ConsumeAzureEventHub {
+    private void configureEventHubOAuthTokenProvider() throws 
InitializationException {
+        final MockOAuth2AccessTokenProvider provider = new 
MockOAuth2AccessTokenProvider();
+        testRunner.addControllerService(EVENT_HUB_OAUTH_SERVICE_ID, provider);
+        testRunner.enableControllerService(provider);
+        
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER,
 EVENT_HUB_OAUTH_SERVICE_ID);
+    }
 
+    private void configureBlobOAuthTokenProvider() throws 
InitializationException {
+        final MockOAuth2AccessTokenProvider provider = new 
MockOAuth2AccessTokenProvider();
+        testRunner.addControllerService(BLOB_OAUTH_SERVICE_ID, provider);
+        testRunner.enableControllerService(provider);
+        
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER,
 BLOB_OAUTH_SERVICE_ID);
+    }
+
+    private class MockConsumeAzureEventHub extends ConsumeAzureEventHub {
         @Override
         protected EventProcessorClient createClient(final ProcessContext 
context) {
             return eventProcessorClient;
@@ -639,4 +684,14 @@ public class TestConsumeAzureEventHub {
             return EXPECTED_TRANSIT_URI;
         }
     }
+
+    private static class MockOAuth2AccessTokenProvider extends 
AbstractControllerService implements OAuth2AccessTokenProvider {
+        @Override
+        public AccessToken getAccessDetails() {
+            final AccessToken accessToken = new AccessToken();
+            accessToken.setAccessToken("access-token");
+            accessToken.setExpiresIn(TimeUnit.MINUTES.toSeconds(5));
+            return accessToken;
+        }
+    }
 }

Reply via email to