This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d41f22491b NIFI-15205 Fixed migration logic for new Authentication
Strategy properties in EventHub components (#10514)
d41f22491b is described below
commit d41f22491bdcab92201f82110d43feac8a90cb10
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Nov 12 23:19:41 2025 +0100
NIFI-15205 Fixed migration logic for new Authentication Strategy properties
in EventHub components (#10514)
Signed-off-by: David Handermann <[email protected]>
---
.../azure/eventhub/ConsumeAzureEventHub.java | 46 +++++++++++++++-------
.../azure/eventhub/GetAzureEventHub.java | 31 +++++++++++----
.../azure/eventhub/PutAzureEventHub.java | 31 +++++++++++----
.../azure/eventhub/utils/AzureEventHubUtils.java | 2 +-
.../azure/eventhub/GetAzureEventHubTest.java | 33 +++++++++++++++-
.../azure/eventhub/PutAzureEventHubTest.java | 33 +++++++++++++++-
.../azure/eventhub/TestConsumeAzureEventHub.java | 44 ++++++++++++++++++++-
7 files changed, 188 insertions(+), 32 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 3f2cfb7a36..2d9811198d 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -96,6 +96,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -369,31 +370,48 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
config.renameProperty("storage-sas-token",
STORAGE_SAS_TOKEN.getName());
config.renameProperty("storage-container-name",
STORAGE_CONTAINER_NAME.getName());
config.renameProperty("event-hub-shared-access-policy-primary-key",
POLICY_PRIMARY_KEY.getName());
-
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
- if
(!config.hasProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName())) {
- final boolean storageAccountKeySet =
config.getPropertyValue(STORAGE_ACCOUNT_KEY.getName())
- .filter(StringUtils::isNotBlank)
- .isPresent();
- final boolean storageSasTokenSet =
config.getPropertyValue(STORAGE_SAS_TOKEN.getName())
- .filter(StringUtils::isNotBlank)
- .isPresent();
+
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ final Optional<String> blobAuthenticationStrategyValue =
config.getRawPropertyValue(BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName())
+ .map(String::trim)
+ .filter(StringUtils::isNotBlank);
+ final boolean blobAuthenticationStrategyMissing =
blobAuthenticationStrategyValue.isEmpty();
+ final boolean storageAccountKeySet = hasConfiguredValue(config,
STORAGE_ACCOUNT_KEY);
+ final boolean storageSasTokenSet = hasConfiguredValue(config,
STORAGE_SAS_TOKEN);
+
+ if (blobAuthenticationStrategyMissing) {
final String blobStorageAuthenticationStrategyValue =
storageSasTokenSet && !storageAccountKeySet
?
BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue()
:
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getValue();
config.setProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName(),
blobStorageAuthenticationStrategyValue);
}
- if (!config.hasProperty(AUTHENTICATION_STRATEGY.getName())) {
- final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME)
+
+ final Optional<String> authenticationStrategyValue =
config.getRawPropertyValue(AUTHENTICATION_STRATEGY.getName())
+ .map(String::trim)
+ .filter(StringUtils::isNotBlank);
+ final boolean authenticationStrategyMissing =
authenticationStrategyValue.isEmpty();
+ final boolean legacyManagedIdentityPropertyPresent =
config.hasProperty(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ final boolean sharedAccessCredentialsConfigured =
hasConfiguredValue(config, ACCESS_POLICY_NAME)
+ || hasConfiguredValue(config, POLICY_PRIMARY_KEY);
+
+ if (authenticationStrategyMissing ||
legacyManagedIdentityPropertyPresent) {
+ final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME)
.map(Boolean::parseBoolean)
- .orElse(false);
- final String authenticationStrategyValue = useManagedIdentity
+ .orElse(!sharedAccessCredentialsConfigured);
+ final String derivedAuthenticationStrategy = useManagedIdentity
?
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue()
:
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue();
- config.setProperty(AUTHENTICATION_STRATEGY.getName(),
authenticationStrategyValue);
+ config.setProperty(AUTHENTICATION_STRATEGY.getName(),
derivedAuthenticationStrategy);
}
-
config.removeProperty(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
+
+
config.removeProperty(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ }
+
+ private boolean hasConfiguredValue(final PropertyConfiguration config,
final PropertyDescriptor descriptor) {
+ return config.getPropertyValue(descriptor.getName())
+ .filter(StringUtils::isNotBlank)
+ .isPresent();
}
@Override
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 6464316fb1..1e31f383e2 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -317,17 +317,27 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
config.renameProperty("Partition Recivier Fetch Size",
RECEIVER_FETCH_SIZE.getName());
config.renameProperty("Partition Receiver Timeout (millseconds)",
RECEIVER_FETCH_TIMEOUT.getName());
config.renameProperty(AzureEventHubUtils.OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME,
POLICY_PRIMARY_KEY.getName());
-
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
- if (!config.hasProperty(AUTHENTICATION_STRATEGY.getName())) {
- final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME)
+
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
+
+ final Optional<String> authenticationStrategyValue =
config.getRawPropertyValue(AUTHENTICATION_STRATEGY.getName())
+ .map(String::trim)
+ .filter(value -> !value.isEmpty());
+ final boolean authenticationStrategyMissing =
authenticationStrategyValue.isEmpty();
+ final boolean legacyManagedIdentityPropertyPresent =
config.hasProperty(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ final boolean sharedAccessCredentialsConfigured =
hasConfiguredValue(config, ACCESS_POLICY)
+ || hasConfiguredValue(config, POLICY_PRIMARY_KEY);
+
+ if (authenticationStrategyMissing ||
legacyManagedIdentityPropertyPresent) {
+ final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME)
.map(Boolean::parseBoolean)
- .orElse(false);
- final String authenticationStrategyValue = useManagedIdentity
+ .orElse(!sharedAccessCredentialsConfigured);
+ final String derivedAuthenticationStrategy = useManagedIdentity
?
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue()
:
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue();
- config.setProperty(AUTHENTICATION_STRATEGY.getName(),
authenticationStrategyValue);
+ config.setProperty(AUTHENTICATION_STRATEGY.getName(),
derivedAuthenticationStrategy);
}
-
config.removeProperty(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
+
+
config.removeProperty(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
}
/**
@@ -355,6 +365,13 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
return eventHubConsumerClient.receiveFromPartition(partitionId,
receiverFetchSize, eventPosition, receiverFetchTimeout);
}
+ private boolean hasConfiguredValue(final PropertyConfiguration config,
final PropertyDescriptor descriptor) {
+ return config.getPropertyValue(descriptor.getName())
+ .map(String::trim)
+ .filter(value -> !value.isEmpty())
+ .isPresent();
+ }
+
private void createClient() {
if (isCreateClientEnabled()) {
closeClient();
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index c34aeb5b01..991b19428c 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -59,6 +59,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -192,17 +193,33 @@ public class PutAzureEventHub extends AbstractProcessor
implements AzureEventHub
config.renameProperty("partitioning-key-attribute-name",
PARTITIONING_KEY_ATTRIBUTE_NAME.getName());
config.renameProperty("max-batch-size", MAX_BATCH_SIZE.getName());
config.renameProperty(AzureEventHubUtils.OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME,
POLICY_PRIMARY_KEY.getName());
-
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
- if (!config.hasProperty(AUTHENTICATION_STRATEGY.getName())) {
- final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME)
+
config.renameProperty(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
+
+ final Optional<String> authenticationStrategyValue =
config.getRawPropertyValue(AUTHENTICATION_STRATEGY.getName())
+ .map(String::trim)
+ .filter(StringUtils::isNotBlank);
+ final boolean authenticationStrategyMissing =
authenticationStrategyValue.isEmpty();
+ final boolean legacyManagedIdentityPropertyPresent =
config.hasProperty(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ final boolean sharedAccessCredentialsConfigured =
hasConfiguredValue(config, ACCESS_POLICY)
+ || hasConfiguredValue(config, POLICY_PRIMARY_KEY);
+
+ if (authenticationStrategyMissing ||
legacyManagedIdentityPropertyPresent) {
+ final boolean useManagedIdentity =
config.getPropertyValue(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME)
.map(Boolean::parseBoolean)
- .orElse(false);
- final String authenticationStrategyValue = useManagedIdentity
+ .orElse(!sharedAccessCredentialsConfigured);
+ final String derivedAuthenticationStrategy = useManagedIdentity
?
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue()
:
AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue();
- config.setProperty(AUTHENTICATION_STRATEGY.getName(),
authenticationStrategyValue);
+ config.setProperty(AUTHENTICATION_STRATEGY.getName(),
derivedAuthenticationStrategy);
}
-
config.removeProperty(AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME);
+
+
config.removeProperty(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME);
+ }
+
+ private boolean hasConfiguredValue(final PropertyConfiguration config,
final PropertyDescriptor descriptor) {
+ return config.getPropertyValue(descriptor.getName())
+ .filter(StringUtils::isNotBlank)
+ .isPresent();
}
protected EventHubProducerClient createEventHubProducerClient(final
ProcessContext context) throws ProcessException {
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
index 31f715965e..0c3f657b18 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -46,7 +46,7 @@ public final class AzureEventHubUtils {
public static final AllowableValue AZURE_US_GOV_ENDPOINT = new
AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government",
"Servicebus endpoint for US Government");
public static final String OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME =
"Shared Access Policy Primary Key";
public static final String OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME =
"use-managed-identity";
- public static final String USE_MANAGED_IDENTITY_PROPERTY_NAME = "Use Azure
Managed Identity";
+ public static final String LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME =
"Use Azure Managed Identity";
public static final PropertyDescriptor POLICY_PRIMARY_KEY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Key")
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index b6588332e8..1a7ffc5f67 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -30,6 +30,7 @@ import org.apache.nifi.scheduling.ExecutionNode;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyConfiguration;
import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -41,6 +42,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -108,11 +110,40 @@ public class GetAzureEventHubTest {
"Partition Recivier Fetch Size",
GetAzureEventHub.RECEIVER_FETCH_SIZE.getName(),
"Partition Receiver Timeout (millseconds)",
GetAzureEventHub.RECEIVER_FETCH_TIMEOUT.getName(),
AzureEventHubUtils.OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME,
GetAzureEventHub.POLICY_PRIMARY_KEY.getName(),
- AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME
+ AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME
);
assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
}
+
+ @Test
+ void testMigrationPreservesSharedAccessAuthentication() {
+ final Map<String, String> properties = new HashMap<>();
+
properties.put(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME,
"false");
+ properties.put(GetAzureEventHub.ACCESS_POLICY.getName(), POLICY_NAME);
+ properties.put(GetAzureEventHub.POLICY_PRIMARY_KEY.getName(),
POLICY_KEY);
+ properties.put(GetAzureEventHub.AUTHENTICATION_STRATEGY.getName(),
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue());
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(properties);
+ new GetAzureEventHub().migrateProperties(configuration);
+
+
assertEquals(AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue(),
+
configuration.getRawProperties().get(GetAzureEventHub.AUTHENTICATION_STRATEGY.getName()));
+ }
+
+ @Test
+ void testMigrationDoesNotOverrideExplicitAuthenticationStrategy() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put(GetAzureEventHub.AUTHENTICATION_STRATEGY.getName(),
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue());
+ properties.put(GetAzureEventHub.ACCESS_POLICY.getName(), POLICY_NAME);
+ properties.put(GetAzureEventHub.POLICY_PRIMARY_KEY.getName(),
POLICY_KEY);
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(properties);
+ new GetAzureEventHub().migrateProperties(configuration);
+
+
assertEquals(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue(),
+
configuration.getRawProperties().get(GetAzureEventHub.AUTHENTICATION_STRATEGY.getName()));
+ }
private void configureProxyControllerService() throws
InitializationException {
final String serviceId = "proxyConfigurationService";
final ProxyConfiguration proxyConfiguration =
mock(ProxyConfiguration.class);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 28e0e6f980..25a0318e47 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -25,6 +25,7 @@ import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
+import org.apache.nifi.util.MockPropertyConfiguration;
import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -38,6 +39,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.net.Proxy;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import static
org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
@@ -98,12 +100,41 @@ public class PutAzureEventHubTest {
"partitioning-key-attribute-name",
PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME.getName(),
"max-batch-size", PutAzureEventHub.MAX_BATCH_SIZE.getName(),
AzureEventHubUtils.OLD_POLICY_PRIMARY_KEY_DESCRIPTOR_NAME,
PutAzureEventHub.POLICY_PRIMARY_KEY.getName(),
- AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME
+ AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME
);
assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
}
+ @Test
+ void testMigrationPreservesSharedAccessAuthentication() {
+ final Map<String, String> properties = new HashMap<>();
+
properties.put(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME,
"false");
+ properties.put(PutAzureEventHub.ACCESS_POLICY.getName(), POLICY_NAME);
+ properties.put(PutAzureEventHub.POLICY_PRIMARY_KEY.getName(),
POLICY_KEY);
+ properties.put(PutAzureEventHub.AUTHENTICATION_STRATEGY.getName(),
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue());
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(properties);
+ new PutAzureEventHub().migrateProperties(configuration);
+
+
assertEquals(AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue(),
+
configuration.getRawProperties().get(PutAzureEventHub.AUTHENTICATION_STRATEGY.getName()));
+ }
+
+ @Test
+ void testMigrationDoesNotOverrideExplicitAuthenticationStrategy() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put(PutAzureEventHub.AUTHENTICATION_STRATEGY.getName(),
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue());
+ properties.put(PutAzureEventHub.ACCESS_POLICY.getName(), POLICY_NAME);
+ properties.put(PutAzureEventHub.POLICY_PRIMARY_KEY.getName(),
POLICY_KEY);
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(properties);
+ new PutAzureEventHub().migrateProperties(configuration);
+
+
assertEquals(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue(),
+
configuration.getRawProperties().get(PutAzureEventHub.AUTHENTICATION_STRATEGY.getName()));
+ }
+
private void configureProxyControllerService() throws
InitializationException {
final String serviceId = "proxyConfigurationService";
final ProxyConfiguration proxyConfiguration =
mock(ProxyConfiguration.class);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 42046ca52e..1a7a41f76b 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -48,6 +48,7 @@ import
org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrateg
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import
org.apache.nifi.shared.azure.eventhubs.BlobStorageAuthenticationStrategy;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyConfiguration;
import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -446,7 +447,7 @@ public class TestConsumeAzureEventHub {
Map.entry("storage-sas-token",
ConsumeAzureEventHub.STORAGE_SAS_TOKEN.getName()),
Map.entry("storage-container-name",
ConsumeAzureEventHub.STORAGE_CONTAINER_NAME.getName()),
Map.entry("event-hub-shared-access-policy-primary-key",
ConsumeAzureEventHub.POLICY_PRIMARY_KEY.getName()),
-
Map.entry(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.USE_MANAGED_IDENTITY_PROPERTY_NAME)
+
Map.entry(AzureEventHubUtils.OLD_USE_MANAGED_IDENTITY_DESCRIPTOR_NAME,
AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME)
);
assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
@@ -455,6 +456,47 @@ public class TestConsumeAzureEventHub {
"Blob Storage Authentication Strategy should be initialized
during migration");
}
+ @Test
+ void testMigrationPreservesSharedAccessAuthentication() {
+ final Map<String, String> properties = new HashMap<>();
+
properties.put(AzureEventHubUtils.LEGACY_USE_MANAGED_IDENTITY_PROPERTY_NAME,
"false");
+ properties.put(ConsumeAzureEventHub.ACCESS_POLICY_NAME.getName(),
POLICY_NAME);
+ properties.put(ConsumeAzureEventHub.POLICY_PRIMARY_KEY.getName(),
POLICY_KEY);
+ properties.put(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY.getName(),
AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getValue());
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(properties);
+ new ConsumeAzureEventHub().migrateProperties(configuration);
+
+
assertEquals(AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue(),
+
configuration.getRawProperties().get(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY.getName()));
+ }
+
+ @Test
+ void testMigrationDerivesBlobAuthenticationStrategyFromSasToken() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put(ConsumeAzureEventHub.STORAGE_SAS_TOKEN.getName(),
STORAGE_TOKEN);
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(properties);
+ new ConsumeAzureEventHub().migrateProperties(configuration);
+
+
assertEquals(BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getValue(),
+
configuration.getRawProperties().get(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName()));
+ }
+
+ @Test
+ void testDoesNotOverrideBlobAuthenticationWhenExplicitlyConfigured() {
+ final Map<String, String> properties = new HashMap<>();
+
properties.put(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName(),
BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getValue());
+ properties.put(ConsumeAzureEventHub.STORAGE_SAS_TOKEN.getName(),
STORAGE_TOKEN);
+ properties.put(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY.getName(),
STORAGE_ACCOUNT_KEY);
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(properties);
+ new ConsumeAzureEventHub().migrateProperties(configuration);
+
+
assertEquals(BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getValue(),
+
configuration.getRawProperties().get(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY.getName()));
+ }
+
private void setProperties() {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,
EVENT_HUB_NAME);
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,
EVENT_HUB_NAMESPACE);