This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 852715a  NIFI-7409: Azure managed identity support to Azure Datalake 
processors
852715a is described below

commit 852715aadd9e989d37aeadecfe92093e212a5ad1
Author: sjyang18 <ilson...@hotmail.com>
AuthorDate: Fri May 1 18:55:50 2020 +0000

    NIFI-7409: Azure managed identity support to Azure Datalake processors
    
    NIFI-7409: review changes
    NIFI-7409: ordering import statements
    NIFI-7409: changed validateCredentialProperties logic
    
    This closes #4249.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../nifi-azure-processors/pom.xml                  |  16 +--
 .../AbstractAzureDataLakeStorageProcessor.java     | 111 +++++++++++++++------
 ...eStorageCredentialsControllerServiceLookup.java |   3 +-
 .../storage/TestAbstractAzureDataLakeStorage.java  |  19 +++-
 nifi-nar-bundles/nifi-azure-bundle/pom.xml         |  42 ++++++++
 5 files changed, 147 insertions(+), 44 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 0a3602d..2cff8ee 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -57,6 +57,16 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-core</artifactId>
+            <version>1.5.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-identity</artifactId>
+            <version>1.0.6</version>
+        </dependency>
+        <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-eventhubs</artifactId>
             <version>${azure-eventhubs.version}</version>
@@ -75,12 +85,6 @@
             <artifactId>azure-storage-file-datalake</artifactId>
             <version>12.1.1</version>
         </dependency>
-        <!-- overriding jackson-core in azure-storage -->
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-            <version>2.10.3</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 40d276c..af75f99 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -16,30 +16,32 @@
  */
 package org.apache.nifi.processors.azure;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.azure.identity.ManagedIdentityCredential;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.context.PropertyContext;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.commons.lang3.StringUtils;
-
-import com.azure.storage.common.StorageSharedKeyCredential;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Map;
 
 public abstract class AbstractAzureDataLakeStorageProcessor extends 
AbstractProcessor {
 
@@ -85,6 +87,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor 
extends AbstractProc
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor USE_MANAGED_IDENTITY = new 
PropertyDescriptor.Builder()
+            .name("use-managed-identity")
+            .displayName("Use Azure Managed Identity")
+            .description("Choose whether or not to use the managed identity of 
Azure VM/VMSS ")
+            .required(false).defaultValue("false").allowableValues("true", 
"false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+
     public static final PropertyDescriptor FILESYSTEM = new 
PropertyDescriptor.Builder()
             .name("filesystem-name").displayName("Filesystem Name")
             .description("Name of the Azure Storage File System. It is assumed 
to be already existing.")
@@ -110,6 +119,15 @@ public abstract class 
AbstractAzureDataLakeStorageProcessor extends AbstractProc
             .defaultValue("${azure.filename}")
             .build();
 
+    public static final PropertyDescriptor ENDPOINT_SUFFIX = new 
PropertyDescriptor.Builder()
+            .name("endpoint-suffix").displayName("Endpoint Suffix")
+            .description("Endpoint Suffix")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .defaultValue("dfs.core.windows.net")
+            .build();
+
     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description(
             "Files that have been successfully written to Azure storage are 
transferred to this relationship")
             .build();
@@ -118,9 +136,14 @@ public abstract class 
AbstractAzureDataLakeStorageProcessor extends AbstractProc
             .build();
 
     private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(
-            Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, 
AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY,
-                    AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, 
AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
-                    AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
AbstractAzureDataLakeStorageProcessor.FILE));
+            Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME,
+                    AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY,
+                    AbstractAzureDataLakeStorageProcessor.SAS_TOKEN,
+                    AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY,
+                    AbstractAzureDataLakeStorageProcessor.ENDPOINT_SUFFIX,
+                    AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
+                    AbstractAzureDataLakeStorageProcessor.DIRECTORY,
+                    AbstractAzureDataLakeStorageProcessor.FILE));
 
     private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(
             new HashSet<>(Arrays.asList(
@@ -134,17 +157,32 @@ public abstract class 
AbstractAzureDataLakeStorageProcessor extends AbstractProc
 
     public static Collection<ValidationResult> 
validateCredentialProperties(final ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>();
-        final String accountName = 
validationContext.getProperty(ACCOUNT_NAME).getValue();
-        final String accountKey = 
validationContext.getProperty(ACCOUNT_KEY).getValue();
-        final String sasToken = 
validationContext.getProperty(SAS_TOKEN).getValue();
-
-        if (StringUtils.isNotBlank(accountName)
-                && ((StringUtils.isNotBlank(accountKey) && 
StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && 
StringUtils.isBlank(sasToken)))) {
-            results.add(new ValidationResult.Builder().subject("Azure Storage 
Credentials").valid(false)
-                    .explanation("either " + ACCOUNT_NAME.getDisplayName() + " 
with " + ACCOUNT_KEY.getDisplayName() +
-                            " or " + ACCOUNT_NAME.getDisplayName() + " with " 
+ SAS_TOKEN.getDisplayName() +
-                            " must be specified, not both")
-                    .build());
+
+        final boolean useManagedIdentity = 
validationContext.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+        final boolean accountKeyIsSet  = 
validationContext.getProperty(ACCOUNT_KEY).isSet();
+        final boolean sasTokenIsSet     = 
validationContext.getProperty(SAS_TOKEN).isSet();
+
+        int credential_config_found = 0;
+        if(useManagedIdentity) credential_config_found++;
+        if(accountKeyIsSet) credential_config_found++;
+        if(sasTokenIsSet) credential_config_found++;
+
+        if(credential_config_found == 0){
+            final String msg = String.format(
+                "At least one of ['%s', '%s', '%s'] should be set",
+                ACCOUNT_KEY.getDisplayName(),
+                SAS_TOKEN.getDisplayName(),
+                USE_MANAGED_IDENTITY.getDisplayName()
+            );
+            results.add(new ValidationResult.Builder().subject("Credentials 
config").valid(false).explanation(msg).build());
+        } else if(credential_config_found > 1) {
+            final String msg = String.format(
+                "Only one of ['%s', '%s', '%s'] should be set",
+                ACCOUNT_KEY.getDisplayName(),
+                SAS_TOKEN.getDisplayName(),
+                USE_MANAGED_IDENTITY.getDisplayName()
+            );
+            results.add(new ValidationResult.Builder().subject("Credentials 
config").valid(false).explanation(msg).build());
         }
         return results;
     }
@@ -154,7 +192,9 @@ public abstract class AbstractAzureDataLakeStorageProcessor 
extends AbstractProc
         final String accountName = 
context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
         final String accountKey = 
context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
         final String sasToken = 
context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
-        final String endpoint = 
String.format("https://%s.dfs.core.windows.net";, accountName);
+        final String endpointSuffix = 
context.getProperty(ENDPOINT_SUFFIX).evaluateAttributeExpressions(attributes).getValue();
+        final String endpoint = String.format("https://%s.%s";, 
accountName,endpointSuffix);
+        final boolean useManagedIdentity = 
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
         DataLakeServiceClient storageClient;
         if (StringUtils.isNotBlank(accountKey)) {
             final StorageSharedKeyCredential credential = new 
StorageSharedKeyCredential(accountName,
@@ -164,6 +204,13 @@ public abstract class 
AbstractAzureDataLakeStorageProcessor extends AbstractProc
         } else if (StringUtils.isNotBlank(sasToken)) {
             storageClient = new 
DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
                     .buildClient();
+        } else if(useManagedIdentity){
+            final ManagedIdentityCredential misCrendential = new 
ManagedIdentityCredentialBuilder()
+                                                                .build();
+            storageClient = new  DataLakeServiceClientBuilder()
+                                    .endpoint(endpoint)
+                                    .credential(misCrendential)
+                                    .buildClient();
         } else {
             throw new IllegalArgumentException(String.format("Either '%s' or 
'%s' must be defined.",
                     ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName()));
@@ -181,4 +228,4 @@ public abstract class AbstractAzureDataLakeStorageProcessor 
extends AbstractProc
     public Set<Relationship> getRelationships() {
         return RELATIONSHIPS;
     }
-}
\ No newline at end of file
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
index 4ac2f07..1cfe1a7 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
@@ -16,12 +16,13 @@
  */
 package org.apache.nifi.services.azure.storage;
 
+import java.util.Map;
+
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import 
org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
-import java.util.Map;
 
 @Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", 
"credentials" })
 @CapabilityDescription("Provides an AzureStorageCredentialsService that can be 
used to dynamically select another AzureStorageCredentialsService. " +
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
index 59800bb..960b1fe 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
@@ -16,17 +16,18 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.SAS_TOKEN;
+import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
 
 public class TestAbstractAzureDataLakeStorage {
 
@@ -58,6 +59,14 @@ public class TestAbstractAzureDataLakeStorage {
     }
 
     @Test
+    public void testValidWhenAccountNameAndUseManagedIdentity() {
+        runner.removeProperty(ACCOUNT_KEY);
+        runner.setProperty(USE_MANAGED_IDENTITY, "true");
+
+        runner.assertValid();
+    }
+
+    @Test
     public void testNotValidWhenNoAccountNameSpecified() {
         runner.removeProperty(ACCOUNT_NAME);
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index d4983e1..c5ecc49 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -27,6 +27,7 @@
 
     <properties>
         <azure-storage.version>8.4.0</azure-storage.version>
+        <jackson.version>2.10.3</jackson.version>
     </properties>
 
     <modules>
@@ -50,6 +51,47 @@
                     </exclusion>
                 </exclusions>
             </dependency>
+            <!-- dependency convergency resolution -->
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-annotations</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.dataformat</groupId>
+                <artifactId>jackson-dataformat-xml</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.module</groupId>
+                <artifactId>jackson-module-jaxb-annotations</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.datatype</groupId>
+                <artifactId>jackson-datatype-jsr310</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-lang3</artifactId>
+                <version>3.9</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-text</artifactId>
+                <version>1.8</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>

Reply via email to