This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new aa1e272 NIFI-6149: Azure EventHub Managed identities support patch aa1e272 is described below commit aa1e272052e0c271458ee458a79a1fecbc7f7376 Author: sjyang18 <ilson...@hotmail.com> AuthorDate: Wed Apr 22 18:42:35 2020 +0000 NIFI-6149: Azure EventHub Managed identities support patch review changes additional review changes NIFI-6149: typo fixes This closes #4226. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi-azure-processors/pom.xml | 10 ++- .../azure/eventhub/ConsumeAzureEventHub.java | 44 ++++++----- .../azure/eventhub/GetAzureEventHub.java | 46 +++++++---- .../azure/eventhub/PutAzureEventHub.java | 49 ++++++++---- .../azure/eventhub/utils/AzureEventHubUtils.java | 90 ++++++++++++++++++++++ .../azure/eventhub/GetAzureEventHubTest.java | 18 ++++- .../azure/eventhub/PutAzureEventHubTest.java | 9 +++ .../azure/eventhub/TestConsumeAzureEventHub.java | 34 +++++++- 8 files changed, 246 insertions(+), 54 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 5e909a5..b0e80df 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -20,8 +20,8 @@ <artifactId>nifi-azure-processors</artifactId> <packaging>jar</packaging> <properties> - <azure-eventhubs.version>2.3.2</azure-eventhubs.version> - <azure-eventhubs-eph.version>2.5.2</azure-eventhubs-eph.version> + <azure-eventhubs.version>3.1.1</azure-eventhubs.version> + <azure-eventhubs-eph.version>3.1.1</azure-eventhubs-eph.version> </properties> <dependencies> <dependency> @@ -89,6 +89,12 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <version>1.12.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> <version>1.12.0-SNAPSHOT</version> </dependency> diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index f1e3a64..7873e4c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -23,7 +23,6 @@ import com.microsoft.azure.eventprocessorhost.EventProcessorOptions; import com.microsoft.azure.eventprocessorhost.IEventProcessor; import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory; import com.microsoft.azure.eventprocessorhost.PartitionContext; -import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.ReceiverDisconnectedException; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -76,6 +75,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.nifi.util.StringUtils.isEmpty; +import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"}) @CapabilityDescription("Receives messages from Azure Event Hubs, writing the contents of the message to the content of the FlowFile.") @@ -115,17 +115,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { .description("The name of the shared access policy. This policy must have Listen claims.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .required(true) + .required(false) .build(); static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AzureEventHubUtils.POLICY_PRIMARY_KEY) .name("event-hub-shared-access-policy-primary-key") - .displayName("Shared Access Policy Primary Key") - .description("The primary key of the shared access policy.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .sensitive(true) - .required(true) .build(); + static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY; static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder() .name("event-hub-consumer-group") .displayName("Consumer Group") @@ -261,7 +257,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { static { PROPERTIES = Collections.unmodifiableList(Arrays.asList( - NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, CONSUMER_GROUP, CONSUMER_HOSTNAME, + NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME, RECORD_READER, RECORD_WRITER, INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME @@ -335,6 +331,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { .valid(false) .build()); } + results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext)); return results; } @@ -347,9 +344,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { } } - public class EventProcessorFactory implements IEventProcessorFactory { + public class EventProcessorFactory implements IEventProcessorFactory<EventProcessor> { @Override - public IEventProcessor createEventProcessor(PartitionContext context) throws Exception { + public EventProcessor createEventProcessor(PartitionContext context) throws Exception { final EventProcessor eventProcessor = new EventProcessor(); return eventProcessor; } @@ -581,12 +578,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue(); validateRequiredProperty(EVENT_HUB_NAME, eventHubName); - final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue(); - validateRequiredProperty(ACCESS_POLICY_NAME, sasName); - - final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue(); - validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey); - final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName); @@ -627,9 +618,22 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey); - final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder().setNamespaceName(namespaceName).setEventHubName( eventHubName).setSasKeyName(sasName).setSasKey(sasKey); - - eventProcessorHost = new EventProcessorHost(consumerHostname, eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString, containerName); + final String connectionString; + final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); + if(useManagedIdentity) { + connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName, eventHubName); + } else { + final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(ACCESS_POLICY_NAME, sasName); + final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey); + connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, eventHubName, sasName, sasKey); + } + eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder + .newBuilder(consumerHostname, consumerGroupName) + .useAzureStorageCheckpointLeaseManager(storageConnectionString, containerName, null) + .useEventHubConnectionString(connectionString, eventHubName) + .build(); options.setExceptionNotification(e -> { getLogger().error("An error occurred while receiving messages from Azure Event Hub {}" + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index cb8c9ac..78a3327 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -22,6 +22,7 @@ import java.net.URISyntaxException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -54,6 +55,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; @@ -63,6 +66,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"}) @CapabilityDescription("Receives messages from Microsoft Azure Event Hubs, writing the contents of the Azure message to the content of the FlowFile. " @@ -103,16 +107,10 @@ public class GetAzureEventHub extends AbstractProcessor { .description("The name of the shared access policy. This policy must have Listen claims.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .required(true) - .build(); - static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() - .name("Shared Access Policy Primary Key") - .description("The primary key of the shared access policy") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .sensitive(true) - .required(true) + .required(false) .build(); + static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY; + static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY; static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder() .name("Number of Event Hub Partitions") @@ -163,6 +161,7 @@ public class GetAzureEventHub extends AbstractProcessor { .description("Any FlowFile that is successfully received from the event hub will be transferred to this Relationship.") .build(); + private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>(); private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>(); private volatile Instant configuredEnqueueTime; @@ -184,6 +183,7 @@ public class GetAzureEventHub extends AbstractProcessor { _propertyDescriptors.add(NAMESPACE); _propertyDescriptors.add(ACCESS_POLICY); _propertyDescriptors.add(POLICY_PRIMARY_KEY); + _propertyDescriptors.add(USE_MANAGED_IDENTITY); _propertyDescriptors.add(NUM_PARTITIONS); _propertyDescriptors.add(CONSUMER_GROUP); _propertyDescriptors.add(ENQUEUE_TIME); @@ -207,10 +207,16 @@ public class GetAzureEventHub extends AbstractProcessor { return propertyDescriptors; } + @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context); + return retVal; + } + protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException { try { - EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2"; - eventHubClient = EventHubClient.createSync(connectionString, executor); + EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1"; + eventHubClient = EventHubClient.createFromConnectionStringSync(connectionString, executor); } catch (IOException | EventHubException e) { throw new ProcessException(e); } @@ -301,11 +307,23 @@ public class GetAzureEventHub extends AbstractProcessor { } this.partitionNames = partitionNames; - final String policyName = context.getProperty(ACCESS_POLICY).getValue(); - final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); + final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); + final String connectionString; + + if(useManagedIdentity){ + connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace,eventHubName); + } else { + final String policyName = context.getProperty(ACCESS_POLICY).getValue(); + final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); + connectionString = new ConnectionStringBuilder() + .setEndpoint(new URI("amqps://"+namespace+serviceBusEndpoint)) + .setEventHubName(eventHubName) + .setSasKeyName(policyName) + .setSasKey(policyKey).toString(); + } if(context.getProperty(ENQUEUE_TIME).isSet()) { configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString()); @@ -324,8 +342,6 @@ public class GetAzureEventHub extends AbstractProcessor { } executor = Executors.newScheduledThreadPool(4); - final String connectionString = new ConnectionStringBuilder().setEndpoint( - new URI("amqps://"+namespace+serviceBusEndpoint)).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString(); setupReceiver(connectionString, executor); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java index ec47384..51bb5bf 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -16,8 +16,10 @@ */ package org.apache.nifi.processors.azure.eventhub; +import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -34,7 +36,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.EventHubException; @@ -53,6 +54,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; @@ -91,16 +94,11 @@ public class PutAzureEventHub extends AbstractProcessor { .description("The name of the shared access policy. This policy must have Send claims.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .required(true) - .build(); - static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() - .name("Shared Access Policy Primary Key") - .description("The primary key of the shared access policy") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .sensitive(true) - .required(true) + .required(false) .build(); + static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY; + static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY; + static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() .name("partitioning-key-attribute-name") .displayName("Partitioning Key Attribute Name") @@ -144,6 +142,7 @@ public class PutAzureEventHub extends AbstractProcessor { _propertyDescriptors.add(NAMESPACE); _propertyDescriptors.add(ACCESS_POLICY); _propertyDescriptors.add(POLICY_PRIMARY_KEY); + _propertyDescriptors.add(USE_MANAGED_IDENTITY); _propertyDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME); _propertyDescriptors.add(MAX_BATCH_SIZE); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); @@ -178,6 +177,13 @@ public class PutAzureEventHub extends AbstractProcessor { } @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context); + return retVal; + } + + + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { try { populateSenderQueue(context); @@ -314,8 +320,15 @@ public class PutAzureEventHub extends AbstractProcessor { final int numThreads = context.getMaxConcurrentTasks(); senderQueue = new LinkedBlockingQueue<>(numThreads); executor = Executors.newScheduledThreadPool(4); - final String policyName = context.getProperty(ACCESS_POLICY).getValue(); - final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); + final boolean useManagedIdentiy = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); + final String policyName, policyKey; + if(useManagedIdentiy) { + policyName = AzureEventHubUtils.MANAGED_IDENTITY_POLICY; + policyKey =null; + } else { + policyName = context.getProperty(ACCESS_POLICY).getValue(); + policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); + } final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); for (int i = 0; i < numThreads; i++) { @@ -345,8 +358,14 @@ public class PutAzureEventHub extends AbstractProcessor { throws ProcessException{ try { - EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2"; - return EventHubClient.createSync(getConnectionString(namespace, eventHubName, policyName, policyKey), executor); + EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1"; + final String connectionString; + if(policyName == AzureEventHubUtils.MANAGED_IDENTITY_POLICY) { + connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName); + } else{ + connectionString = getConnectionString(namespace, eventHubName, policyName, policyKey); + } + return EventHubClient.createFromConnectionStringSync(connectionString, executor); } catch (IOException | EventHubException | IllegalConnectionStringFormatException e) { getLogger().error("Failed to create EventHubClient due to {}", new Object[]{e.getMessage()}, e); throw new ProcessException(e); @@ -354,7 +373,7 @@ public class PutAzureEventHub extends AbstractProcessor { } protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){ - return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString(); + return AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, eventHubName, policyName, policyKey); } /** diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java new file mode 100644 index 0000000..d95e27f --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.utils; + +import java.util.ArrayList; +import java.util.List; + +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +public final class AzureEventHubUtils { + + public static final String MANAGED_IDENTITY_POLICY = ConnectionStringBuilder.MANAGED_IDENTITY_AUTHENTICATION; + + public static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Primary Key") + .description("The primary key of the shared access policy") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .sensitive(true) + .required(false) + .build(); + + public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder() + .name("use-managed-identity") + .displayName("Use Azure Managed Identity") + .description("Choose whether or not to use the managed identity of Azure VM/VMSS") + .required(false).defaultValue("false").allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build(); + + public static List<ValidationResult> customValidate(PropertyDescriptor accessPolicyDescriptor, + PropertyDescriptor policyKeyDescriptor, + ValidationContext context) { + List<ValidationResult> retVal = new ArrayList<>(); + + boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet(); + boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet(); + boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); + + if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet) ) { + final String msg = String.format( + "('%s') and ('%s' with '%s') fields cannot be set at the same time.", + USE_MANAGED_IDENTITY.getDisplayName(), + accessPolicyDescriptor.getDisplayName(), + POLICY_PRIMARY_KEY.getDisplayName() + ); + retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); + } else if (!useManagedIdentity && (!accessPolicyIsSet || !policyKeyIsSet)) { + final String msg = String.format( + "either('%s') or (%s with '%s') must be set", + USE_MANAGED_IDENTITY.getDisplayName(), + accessPolicyDescriptor.getDisplayName(), + POLICY_PRIMARY_KEY.getDisplayName() + ); + retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); + } + return retVal; + } + + public static String getManagedIdentityConnectionString(final String namespace, final String eventHubName){ + return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName) + .setAuthentication(MANAGED_IDENTITY_POLICY).toString(); + } + public static String getSharedAccessSignatureConnectionString(final String namespace, final String eventHubName, final String sasName, final String sasKey) { + return new ConnectionStringBuilder() + .setNamespaceName(namespace) + .setEventHubName(eventHubName) + .setSasKeyName(sasName) + .setSasKey(sasKey).toString(); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index 6e55f60..88bcfa8 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -78,7 +78,23 @@ public class GetAzureEventHubTest { testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000"); testRunner.assertValid(); } - + @Test + public void testProcessorConfigValidityWithManagedIdentityFlag() { + testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,"true"); + testRunner.assertNotValid(); + testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4"); + testRunner.assertValid(); + testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z"); + testRunner.assertValid(); + testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5"); + testRunner.assertValid(); + testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000"); + testRunner.assertValid(); + } @Test public void verifyRelationships(){ assert(1 == processor.getRelationships().size()); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java index 83ed649..183b80d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java @@ -89,6 +89,15 @@ public class PutAzureEventHubTest { testRunner.assertValid(); } @Test + public void testProcessorConfigValidityWithManagedIdentityFlag() { + testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,"true"); + testRunner.assertValid(); + } + @Test public void verifyRelationships(){ assert(2 == processor.getRelationships().size()); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index e7edef7..99541a5 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; @@ -39,6 +40,8 @@ import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessSession; import org.apache.nifi.util.SharedSessionState; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -64,8 +67,15 @@ import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; public class TestConsumeAzureEventHub { + private static final String namespaceName = "nifi-azure-hub"; + private static final String eventHubName = "get-test"; + private static final String storageAccountName = "test-sa"; + private static final String storageAccountKey = "test-sa-key"; + private ConsumeAzureEventHub.EventProcessor eventProcessor; private MockProcessSession processSession; private SharedSessionState sharedState; @@ -97,7 +107,29 @@ public class TestConsumeAzureEventHub { when(partitionContext.getPartitionId()).thenReturn("partition-id"); when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group"); } - + @Test + public void testProcessorConfigValidityWithManagedIdentityFlag() throws InitializationException { + TestRunner testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + final MockRecordParser reader = new MockRecordParser(); + final MockRecordWriter writer = new MockRecordWriter(); + testRunner.addControllerService("writer", writer); + testRunner.enableControllerService(writer); + testRunner.addControllerService("reader", reader); + testRunner.enableControllerService(reader); + testRunner.setProperty(ConsumeAzureEventHub.RECORD_WRITER, "writer"); + testRunner.setProperty(ConsumeAzureEventHub.RECORD_READER, "reader"); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey); + testRunner.assertNotValid(); + + testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true"); + testRunner.assertValid(); + } @Test public void testReceiveOne() throws Exception { final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));