This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c381f0f76f NIFI-15178 Added Authentication Strategy properties to
Azure Event Hubs Components (#10494)
c381f0f76f is described below
commit c381f0f76febe274e01f8a36c95a135ee027a2c5
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Nov 4 20:54:43 2025 +0100
NIFI-15178 Added Authentication Strategy properties to Azure Event Hubs
Components (#10494)
Signed-off-by: David Handermann <[email protected]>
---
.../azure/eventhub/ConsumeAzureEventHub.java | 123 ++++++++++++++++-----
.../azure/eventhub/GetAzureEventHub.java | 23 +++-
.../azure/eventhub/PutAzureEventHub.java | 23 +++-
.../azure/eventhub/utils/AzureEventHubUtils.java | 28 ++---
.../AzureEventHubAuthenticationStrategy.java | 50 +++++++++
.../azure/eventhubs/AzureEventHubComponent.java | 8 ++
.../BlobStorageAuthenticationStrategy.java | 47 ++++++++
.../azure/eventhub/GetAzureEventHubTest.java | 10 +-
.../azure/eventhub/PutAzureEventHubTest.java | 12 +-
.../azure/eventhub/TestConsumeAzureEventHub.java | 25 ++++-
10 files changed, 290 insertions(+), 59 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 5c912d681d..3f2cfb7a36 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -78,8 +78,10 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
+import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
+import
org.apache.nifi.shared.azure.eventhubs.BlobStorageAuthenticationStrategy;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
@@ -143,15 +145,16 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.required(true)
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
+ static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor ACCESS_POLICY_NAME = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Listen claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY =
AzureEventHubUtils.POLICY_PRIMARY_KEY;
- static final PropertyDescriptor USE_MANAGED_IDENTITY =
AzureEventHubUtils.USE_MANAGED_IDENTITY;
static final PropertyDescriptor CONSUMER_GROUP = new
PropertyDescriptor.Builder()
.name("Consumer Group")
.description("The name of the consumer group to use.")
@@ -244,6 +247,15 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.required(true)
.dependsOn(CHECKPOINT_STRATEGY,
CheckpointStrategy.AZURE_BLOB_STORAGE)
.build();
+ static final PropertyDescriptor BLOB_STORAGE_AUTHENTICATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Blob Storage Authentication Strategy")
+ .description("Authentication strategy used to access Azure Blob
Storage when persisting checkpoints.")
+ .allowableValues(BlobStorageAuthenticationStrategy.class)
+
.defaultValue(BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getValue())
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .dependsOn(CHECKPOINT_STRATEGY,
CheckpointStrategy.AZURE_BLOB_STORAGE)
+ .build();
static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new
PropertyDescriptor.Builder()
.name("Storage Account Key")
.description("The Azure Storage account key to store event hub
consumer group state.")
@@ -251,7 +263,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
- .dependsOn(CHECKPOINT_STRATEGY,
CheckpointStrategy.AZURE_BLOB_STORAGE)
+ .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY)
.build();
static final PropertyDescriptor STORAGE_SAS_TOKEN = new
PropertyDescriptor.Builder()
.name("Storage SAS Token")
@@ -261,7 +273,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
"Token must start with a ? character."))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
- .dependsOn(CHECKPOINT_STRATEGY,
CheckpointStrategy.AZURE_BLOB_STORAGE)
+ .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE)
.build();
static final PropertyDescriptor STORAGE_CONTAINER_NAME = new
PropertyDescriptor.Builder()
.name("Storage Container Name")
@@ -301,7 +313,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
TRANSPORT_TYPE,
ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY,
- USE_MANAGED_IDENTITY,
+ AUTHENTICATION_STRATEGY,
CONSUMER_GROUP,
RECORD_READER,
RECORD_WRITER,
@@ -311,9 +323,10 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
RECEIVE_TIMEOUT,
CHECKPOINT_STRATEGY,
STORAGE_ACCOUNT_NAME,
+ STORAGE_CONTAINER_NAME,
+ BLOB_STORAGE_AUTHENTICATION_STRATEGY,
STORAGE_ACCOUNT_KEY,
STORAGE_SAS_TOKEN,
- STORAGE_CONTAINER_NAME,
PROXY_CONFIGURATION_SERVICE
);
@@ -356,7 +369,31 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
config.renameProperty("storage-sas-token",
STORAGE_SAS_TOKEN.getName());
config.renameProperty("storage-container-name",
STORAGE_CONTAINER_NAME.getName());
config.renameProperty("event-hub-shared-access-policy-primary-key",
POLICY_PRIMARY_KEY.getName());
-
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
USE_MANAGED_IDENTITY.getName());
+
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ if
(!config.hasProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName())) {
+ final boolean storageAccountKeySet =
config.getPropertyValue(STORAGE_ACCOUNT_KEY.getName())
+ .filter(StringUtils::isNotBlank)
+ .isPresent();
+ final boolean storageSasTokenSet =
config.getPropertyValue(STORAGE_SAS_TOKEN.getName())
+ .filter(StringUtils::isNotBlank)
+ .isPresent();
+
+ final String blobStorageAuthenticationStrategyValue =
storageSasTokenSet && !storageAccountKeySet
+ ?
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue()
+ :
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getValue();
+
+ config.setProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName(),
blobStorageAuthenticationStrategyValue);
+ }
+ if (!config.hasProperty(AUTHENTICATION_STRATEGY.getName())) {
+ final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME)
+ .map(Boolean::parseBoolean)
+ .orElse(false);
+ final String authenticationStrategyValue = useManagedIdentity
+ ?
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue()
+ :
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue();
+ config.setProperty(AUTHENTICATION_STRATEGY.getName(),
authenticationStrategyValue);
+ }
+
config.removeProperty(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
}
@Override
@@ -378,24 +415,54 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
}
if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
- if (StringUtils.isBlank(storageAccountKey) &&
StringUtils.isBlank(storageSasToken)) {
- results.add(new ValidationResult.Builder()
- .subject(String.format("%s or %s",
- STORAGE_ACCOUNT_KEY.getDisplayName(),
STORAGE_SAS_TOKEN.getDisplayName()))
- .explanation(String.format("either %s or %s should be
set.",
- STORAGE_ACCOUNT_KEY.getDisplayName(),
STORAGE_SAS_TOKEN.getDisplayName()))
- .valid(false)
- .build());
- }
+ final BlobStorageAuthenticationStrategy
blobStorageAuthenticationStrategy =
+
validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY)
+
.asAllowableValue(BlobStorageAuthenticationStrategy.class);
+
+ if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) {
+ if (StringUtils.isBlank(storageAccountKey)) {
+ results.add(new ValidationResult.Builder()
+ .subject(STORAGE_ACCOUNT_KEY.getDisplayName())
+ .explanation(String.format("%s must be set when %s
is %s.",
+ STORAGE_ACCOUNT_KEY.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
+ .valid(false)
+ .build());
+ }
+
+ if (StringUtils.isNotBlank(storageSasToken)) {
+ results.add(new ValidationResult.Builder()
+ .subject(STORAGE_SAS_TOKEN.getDisplayName())
+ .explanation(String.format("%s must not be set
when %s is %s.",
+ STORAGE_SAS_TOKEN.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName()))
+ .valid(false)
+ .build());
+ }
+ } else if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE) {
+ if (StringUtils.isBlank(storageSasToken)) {
+ results.add(new ValidationResult.Builder()
+ .subject(STORAGE_SAS_TOKEN.getDisplayName())
+ .explanation(String.format("%s must be set when %s
is %s.",
+ STORAGE_SAS_TOKEN.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+ .valid(false)
+ .build());
+ }
- if (StringUtils.isNotBlank(storageAccountKey) &&
StringUtils.isNotBlank(storageSasToken)) {
- results.add(new ValidationResult.Builder()
- .subject(String.format("%s or %s",
- STORAGE_ACCOUNT_KEY.getDisplayName(),
STORAGE_SAS_TOKEN.getDisplayName()))
- .explanation(String.format("%s and %s should not be
set at the same time.",
- STORAGE_ACCOUNT_KEY.getDisplayName(),
STORAGE_SAS_TOKEN.getDisplayName()))
- .valid(false)
- .build());
+ if (StringUtils.isNotBlank(storageAccountKey)) {
+ results.add(new ValidationResult.Builder()
+ .subject(STORAGE_ACCOUNT_KEY.getDisplayName())
+ .explanation(String.format("%s must not be set
when %s is %s.",
+ STORAGE_ACCOUNT_KEY.getDisplayName(),
+
BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(),
+
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName()))
+ .valid(false)
+ .build());
+ }
}
}
results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY, validationContext));
@@ -508,8 +575,9 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
.processError(errorProcessor)
.processEventBatch(eventBatchProcessor, maxBatchSize,
maxWaitTime);
- final boolean useManagedIdentity =
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
- if (useManagedIdentity) {
+ final AzureEventHubAuthenticationStrategy authenticationStrategy =
+
context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class);
+ if (authenticationStrategy == null || authenticationStrategy ==
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
eventProcessorClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
@@ -752,10 +820,13 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
final String domainName = serviceBusEndpoint.replace(".servicebus.",
"");
final String storageAccountKey =
context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken =
context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
+ final BlobStorageAuthenticationStrategy
blobStorageAuthenticationStrategy =
+
context.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class);
- if (storageAccountKey != null) {
+ if (blobStorageAuthenticationStrategy ==
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) {
return
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY,
storageAccountName, storageAccountKey, domainName);
}
+
return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN,
storageAccountName, domainName, storageSasToken);
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 3918907756..6464316fb1 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -55,6 +55,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.scheduling.ExecutionNode;
+import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.StopWatch;
@@ -111,15 +112,16 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
.required(true)
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
+ static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor ACCESS_POLICY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Listen claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY =
AzureEventHubUtils.POLICY_PRIMARY_KEY;
- static final PropertyDescriptor USE_MANAGED_IDENTITY =
AzureEventHubUtils.USE_MANAGED_IDENTITY;
static final PropertyDescriptor CONSUMER_GROUP = new
PropertyDescriptor.Builder()
.name("Consumer Group")
@@ -165,7 +167,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
TRANSPORT_TYPE,
ACCESS_POLICY,
POLICY_PRIMARY_KEY,
- USE_MANAGED_IDENTITY,
+ AUTHENTICATION_STRATEGY,
CONSUMER_GROUP,
ENQUEUE_TIME,
RECEIVER_FETCH_SIZE,
@@ -315,7 +317,17 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
config.renameProperty("Partition Recivier Fetch Size",
RECEIVER_FETCH_SIZE.getName());
config.renameProperty("Partition Receiver Timeout (millseconds)",
RECEIVER_FETCH_TIMEOUT.getName());
config.renameProperty(AzureEventHubUtils.OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME,
POLICY_PRIMARY_KEY.getName());
-
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
USE_MANAGED_IDENTITY.getName());
+
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ if (!config.hasProperty(AUTHENTICATION_STRATEGY.getName())) {
+ final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME)
+ .map(Boolean::parseBoolean)
+ .orElse(false);
+ final String authenticationStrategyValue = useManagedIdentity
+ ?
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue()
+ :
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue();
+ config.setProperty(AUTHENTICATION_STRATEGY.getName(),
authenticationStrategyValue);
+ }
+
config.removeProperty(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
}
/**
@@ -370,7 +382,8 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
- final boolean useManagedIdentity =
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+ final AzureEventHubAuthenticationStrategy authenticationStrategy =
+
context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class);
final String fullyQualifiedNamespace = String.format("%s%s",
namespace, serviceBusEndpoint);
final AmqpTransportType transportType =
context.getProperty(TRANSPORT_TYPE).asAllowableValue(AzureEventHubTransportType.class).asAmqpTransportType();
@@ -380,7 +393,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
final String consumerGroup =
context.getProperty(CONSUMER_GROUP).getValue();
eventHubClientBuilder.consumerGroup(consumerGroup);
- if (useManagedIdentity) {
+ if (authenticationStrategy == null || authenticationStrategy ==
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index e5748b8548..c34aeb5b01 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -48,6 +48,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
+import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.stream.io.StreamUtils;
@@ -83,15 +84,16 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
.required(true)
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
+ static final PropertyDescriptor AUTHENTICATION_STRATEGY =
AzureEventHubComponent.AUTHENTICATION_STRATEGY;
static final PropertyDescriptor ACCESS_POLICY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Send claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY =
AzureEventHubUtils.POLICY_PRIMARY_KEY;
- static final PropertyDescriptor USE_MANAGED_IDENTITY =
AzureEventHubUtils.USE_MANAGED_IDENTITY;
static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new
PropertyDescriptor.Builder()
.name("Partitioning Key Attribute Name")
@@ -125,7 +127,7 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
TRANSPORT_TYPE,
ACCESS_POLICY,
POLICY_PRIMARY_KEY,
- USE_MANAGED_IDENTITY,
+ AUTHENTICATION_STRATEGY,
PARTITIONING_KEY_ATTRIBUTE_NAME,
MAX_BATCH_SIZE,
PROXY_CONFIGURATION_SERVICE
@@ -190,11 +192,22 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
config.renameProperty("partitioning-key-attribute-name",
PARTITIONING_KEY_ATTRIBUTE_NAME.getName());
config.renameProperty("max-batch-size", MAX_BATCH_SIZE.getName());
config.renameProperty(AzureEventHubUtils.OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME,
POLICY_PRIMARY_KEY.getName());
-
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
USE_MANAGED_IDENTITY.getName());
+
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ if (!config.hasProperty(AUTHENTICATION_STRATEGY.getName())) {
+ final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME)
+ .map(Boolean::parseBoolean)
+ .orElse(false);
+ final String authenticationStrategyValue = useManagedIdentity
+ ?
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue()
+ :
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue();
+ config.setProperty(AUTHENTICATION_STRATEGY.getName(),
authenticationStrategyValue);
+ }
+
config.removeProperty(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
}
protected EventHubProducerClient createEventHubProducerClient(final
ProcessContext context) throws ProcessException {
- final boolean useManagedIdentity =
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+ final AzureEventHubAuthenticationStrategy authenticationStrategy =
+
context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class);
final String namespace = context.getProperty(NAMESPACE).getValue();
final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
@@ -205,7 +218,7 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
eventHubClientBuilder.transportType(transportType);
final String fullyQualifiedNamespace = String.format("%s%s",
namespace, serviceBusEndpoint);
- if (useManagedIdentity) {
+ if (authenticationStrategy == null || authenticationStrategy ==
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY) {
final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
index a71cb058b1..31f715965e 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -26,6 +26,7 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
+import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
@@ -45,6 +46,7 @@ public final class AzureEventHubUtils {
public static final AllowableValue AZURE_US_GOV_ENDPOINT = new
AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government",
"Servicebus endpoint for US Government");
public static final String OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME =
"Shared Access Policy Primary Key";
public static final String OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME =
"use-managed-identity";
+ public static final String USE_MANAGED_IDENTITY_PROPERTY_NAME = "Use Azure
Managed Identity";
public static final PropertyDescriptor POLICY_PRIMARY_KEY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Key")
@@ -53,14 +55,9 @@ public final class AzureEventHubUtils {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
.required(false)
+ .dependsOn(AzureEventHubComponent.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE)
.build();
- public static final PropertyDescriptor USE_MANAGED_IDENTITY = new
PropertyDescriptor.Builder()
- .name("Use Azure Managed Identity")
- .description("Choose whether or not to use the managed identity of
Azure VM/VMSS")
- .required(true).defaultValue("false").allowableValues("true",
"false")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
-
public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new
PropertyDescriptor.Builder()
.name("Service Bus Endpoint")
.description("To support namespaces not in the default windows.net
domain.")
@@ -78,20 +75,25 @@ public final class AzureEventHubUtils {
boolean accessPolicyIsSet =
context.getProperty(accessPolicyDescriptor).isSet();
boolean policyKeyIsSet =
context.getProperty(policyKeyDescriptor).isSet();
- boolean useManagedIdentity =
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+ final AzureEventHubAuthenticationStrategy authenticationStrategy =
Optional.ofNullable(
+
context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY)
+
.asAllowableValue(AzureEventHubAuthenticationStrategy.class))
+ .orElse(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY);
- if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet)) {
+ if (authenticationStrategy ==
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY && (accessPolicyIsSet ||
policyKeyIsSet)) {
final String msg = String.format(
- "('%s') and ('%s' with '%s') fields cannot be set at the
same time.",
- USE_MANAGED_IDENTITY.getDisplayName(),
+ "When '%s' is set to '%s', '%s' and '%s' must not be set.",
+
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName(),
accessPolicyDescriptor.getDisplayName(),
POLICY_PRIMARY_KEY.getDisplayName()
);
validationResults.add(new
ValidationResult.Builder().subject("Credentials
config").valid(false).explanation(msg).build());
- } else if (!useManagedIdentity && (!accessPolicyIsSet ||
!policyKeyIsSet)) {
+ } else if (authenticationStrategy ==
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE &&
(!accessPolicyIsSet || !policyKeyIsSet)) {
final String msg = String.format(
- "either('%s') or (%s with '%s') must be set",
- USE_MANAGED_IDENTITY.getDisplayName(),
+ "When '%s' is set to '%s', both '%s' and '%s' must be set",
+
AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(),
+
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName(),
accessPolicyDescriptor.getDisplayName(),
POLICY_PRIMARY_KEY.getDisplayName()
);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
new file mode 100644
index 0000000000..654871914c
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.shared.azure.eventhubs;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Strategies supported for authenticating to Azure Event Hubs.
+ */
+public enum AzureEventHubAuthenticationStrategy implements DescribedValue {
+ MANAGED_IDENTITY("Managed Identity", "Authenticate using the Managed
Identity of the hosting Azure resource."),
+ SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate using the
Shared Access Policy name and key.");
+
+ private final String displayName;
+ private final String description;
+
+ AzureEventHubAuthenticationStrategy(final String displayName, final String
description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
index 4ad5137159..4da423cb3a 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java
@@ -35,6 +35,14 @@ public interface AzureEventHubComponent {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
+ PropertyDescriptor AUTHENTICATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Authentication Strategy")
+ .description("Specifies the strategy used for authenticating to
Azure Event Hubs")
+ .allowableValues(AzureEventHubAuthenticationStrategy.class)
+
.defaultValue(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.HTTP_AUTH};
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new
PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(PROXY_SPECS))
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
new file mode 100644
index 0000000000..32c945bc96
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.shared.azure.eventhubs;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum BlobStorageAuthenticationStrategy implements DescribedValue {
+ STORAGE_ACCOUNT_KEY("Storage Account Key", "Authenticate to Azure Blob
Storage using the account key."),
+ SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate to Azure
Blob Storage using a SAS token.");
+
+ private final String displayName;
+ private final String description;
+
+ BlobStorageAuthenticationStrategy(final String displayName, final String
description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 804274c0ab..b6588332e8 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -27,6 +27,7 @@ import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.scheduling.ExecutionNode;
+import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.PropertyMigrationResult;
@@ -78,6 +79,8 @@ public class GetAzureEventHubTest {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.assertValid();
+ testRunner.setProperty(GetAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.ACCESS_POLICY, POLICY_NAME);
testRunner.assertNotValid();
@@ -105,7 +108,7 @@ public class GetAzureEventHubTest {
"Partition Recivier Fetch Size",
GetAzureEventHub.RECEIVER_FETCH_SIZE.getName(),
"Partition Receiver Timeout (millseconds)",
GetAzureEventHub.RECEIVER_FETCH_TIMEOUT.getName(),
AzureEventHubUtils.OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME,
GetAzureEventHub.POLICY_PRIMARY_KEY.getName(),
- AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
GetAzureEventHub.USE_MANAGED_IDENTITY.getName()
+ AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME
);
assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
@@ -127,8 +130,8 @@ public class GetAzureEventHubTest {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
- testRunner.assertNotValid();
- testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,
Boolean.TRUE.toString());
+ testRunner.assertValid();
+ testRunner.setProperty(GetAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue());
testRunner.assertValid();
}
@@ -224,6 +227,7 @@ public class GetAzureEventHubTest {
private void setProperties() {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
testRunner.setProperty(GetAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(GetAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
testRunner.setProperty(GetAzureEventHub.ACCESS_POLICY, POLICY_NAME);
testRunner.setProperty(GetAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.assertValid();
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index b482fbb278..28e0e6f980 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -23,6 +23,7 @@ import
org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
+import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
@@ -76,6 +77,8 @@ public class PutAzureEventHubTest {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.assertValid();
+ testRunner.setProperty(PutAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.ACCESS_POLICY, POLICY_NAME);
testRunner.assertNotValid();
@@ -95,7 +98,7 @@ public class PutAzureEventHubTest {
"partitioning-key-attribute-name",
PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME.getName(),
"max-batch-size", PutAzureEventHub.MAX_BATCH_SIZE.getName(),
AzureEventHubUtils.OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME,
PutAzureEventHub.POLICY_PRIMARY_KEY.getName(),
- AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
PutAzureEventHub.USE_MANAGED_IDENTITY.getName()
+ AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME
);
assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
@@ -118,8 +121,8 @@ public class PutAzureEventHubTest {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
- testRunner.assertNotValid();
- testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,
Boolean.TRUE.toString());
+ testRunner.assertValid();
+ testRunner.setProperty(PutAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue());
testRunner.assertValid();
}
@@ -204,8 +207,9 @@ public class PutAzureEventHubTest {
private void setProperties() {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
testRunner.setProperty(PutAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(PutAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
testRunner.setProperty(PutAzureEventHub.ACCESS_POLICY, POLICY_NAME);
testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.assertValid();
}
-}
\ No newline at end of file
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 2177ff7ecd..42046ca52e 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -44,7 +44,9 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
+import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
+import
org.apache.nifi.shared.azure.eventhubs.BlobStorageAuthenticationStrategy;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
@@ -72,6 +74,7 @@ import java.util.stream.Stream;
import static
org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
@@ -153,8 +156,8 @@ public class TestConsumeAzureEventHub {
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME,
STORAGE_ACCOUNT_NAME);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY,
STORAGE_ACCOUNT_KEY);
- testRunner.assertNotValid();
- testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,
"true");
+ testRunner.assertValid();
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue());
testRunner.assertValid();
testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE,
AzureEventHubTransportType.AMQP_WEB_SOCKETS);
testRunner.assertValid();
@@ -168,6 +171,8 @@ public class TestConsumeAzureEventHub {
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
+ testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME,
POLICY_NAME);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.assertNotValid();
@@ -181,6 +186,8 @@ public class TestConsumeAzureEventHub {
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
+ testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME,
POLICY_NAME);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.assertNotValid();
@@ -196,10 +203,13 @@ public class TestConsumeAzureEventHub {
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
+ testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME,
POLICY_NAME);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME,
STORAGE_ACCOUNT_NAME);
+
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN,
STORAGE_TOKEN);
testRunner.assertValid();
testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE,
AzureEventHubTransportType.AMQP_WEB_SOCKETS);
@@ -214,6 +224,8 @@ public class TestConsumeAzureEventHub {
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
+ testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME,
POLICY_NAME);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.assertNotValid();
@@ -232,6 +244,8 @@ public class TestConsumeAzureEventHub {
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
+ testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME,
POLICY_NAME);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.assertNotValid();
@@ -432,18 +446,23 @@ public class TestConsumeAzureEventHub {
Map.entry("storage-sas-token",
ConsumeAzureEventHub.STORAGE_SAS_TOKEN.getName()),
Map.entry("storage-container-name",
ConsumeAzureEventHub.STORAGE_CONTAINER_NAME.getName()),
Map.entry("event-hub-shared-access-policy-primary-key",
ConsumeAzureEventHub.POLICY_PRIMARY_KEY.getName()),
-
Map.entry(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
ConsumeAzureEventHub.USE_MANAGED_IDENTITY.getName())
+
Map.entry(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME)
);
assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
+
assertTrue(propertyMigrationResult.getPropertiesUpdated().contains(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName())
+ || propertyMigrationResult.getPropertiesUpdated().isEmpty(),
+ "Blob Storage Authentication Strategy should be initialized
during migration");
}
private void setProperties() {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);
+ testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY,
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME,
POLICY_NAME);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY,
POLICY_KEY);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME,
STORAGE_ACCOUNT_NAME);
+
testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY,
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue());
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN,
STORAGE_TOKEN);
when(partitionContext.getEventHubName()).thenReturn(EVENT_HUB_NAME);