NIFI-4769: Use FlowFile for EL at Fetch and PutAzureBlobStorage

This commit add back the existing capability for those Processors to use
incoming FlowFile attributes to compute account name and account key,
which had been removed by NIFI-4004.

Also, the same capability is added for SAS token.

This closes #2400.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e77e66e8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e77e66e8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e77e66e8

Branch: refs/heads/HDF-3.1-maint
Commit: e77e66e8eb3ab10061bd3691e754715c480634f4
Parents: 6847c9e
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Fri Jan 12 11:03:05 2018 +0900
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Tue Jan 16 15:01:01 2018 -0500

----------------------------------------------------------------------
 .../azure/storage/FetchAzureBlobStorage.java    |  2 +-
 .../azure/storage/ListAzureBlobStorage.java     |  2 +-
 .../azure/storage/PutAzureBlobStorage.java      |  2 +-
 .../azure/storage/utils/AzureStorageUtils.java  | 33 ++++++++++++++++----
 4 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e77e66e8/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
index d960f7d..89d22f3 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -66,7 +66,7 @@ public class FetchAzureBlobStorage extends 
AbstractAzureBlobProcessor {
 
         AtomicReference<Exception> storedException = new AtomicReference<>();
         try {
-            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger());
+            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
             CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
 
             final Map<String, String> attributes = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/e77e66e8/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index 41f347c..2cb6316 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -151,7 +151,7 @@ public class ListAzureBlobStorage extends 
AbstractListProcessor<BlobInfo> {
         }
         final List<BlobInfo> listing = new ArrayList<>();
         try {
-            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger());
+            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
             CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
 
             for (ListBlobItem blob : container.listBlobs(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, null)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e77e66e8/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index a316256..c41733b 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -72,7 +72,7 @@ public class PutAzureBlobStorage extends 
AbstractAzureBlobProcessor {
 
         AtomicReference<Exception> storedException = new AtomicReference<>();
         try {
-            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger());
+            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
             CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
 
             CloudBlob blob = container.getBlockBlobReference(blobPath);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e77e66e8/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 360afbf..95b6d7e 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -42,7 +43,7 @@ public final class AzureStorageUtils {
     public static final PropertyDescriptor ACCOUNT_KEY = new 
PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage 
Account Key")
             .description("The storage account key. This is an admin-like 
password providing access to every container in this account. It is recommended 
" +
                     "one uses Shared Access Signature (SAS) token instead for 
fine-grained control with policies. " +
-                    "There are certain risks in allowing the account key to be 
stored as a flowfile" +
+                    "There are certain risks in allowing the account key to be 
stored as a flowfile " +
                     "attribute. While it does provide for a more flexible flow 
by allowing the account key to " +
                     "be fetched dynamically from a flow file attribute, care 
must be taken to restrict access to " +
                     "the event provenance data (e.g. by strictly controlling 
the policies governing provenance for this Processor). " +
@@ -63,7 +64,12 @@ public final class AzureStorageUtils {
     public static final PropertyDescriptor PROP_SAS_TOKEN = new 
PropertyDescriptor.Builder()
             .name("storage-sas-token")
             .displayName("SAS Token")
-            .description("Shared Access Signature token, including the leading 
'?'. Specify either SAS Token (recommended) or Account Key")
+            .description("Shared Access Signature token, including the leading 
'?'. Specify either SAS Token (recommended) or Account Key. " +
+                    "There are certain risks in allowing the SAS token to be 
stored as a flowfile " +
+                    "attribute. While it does provide for a more flexible flow 
by allowing the account name to " +
+                    "be fetched dynamically from a flowfile attribute, care 
must be taken to restrict access to " +
+                    "the event provenance data (e.g. by strictly controlling 
the policies governing provenance for this Processor). " +
+                    "In addition, the provenance repositories may be put on 
encrypted disk partitions.")
             .required(false)
             .expressionLanguageSupported(true)
             .sensitive(true)
@@ -78,10 +84,25 @@ public final class AzureStorageUtils {
         // do not instantiate
     }
 
-    public static CloudBlobClient createCloudBlobClient(ProcessContext 
context, ComponentLog logger) {
-        final String accountName = 
context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
-        final String accountKey = 
context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
-        final String sasToken = 
context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
+    /**
+     * Create CloudBlobClient instance.
+     * @param flowFile An incoming FlowFile can be used for NiFi Expression 
Language evaluation to derive
+     *                 Account Name, Account Key or SAS Token. This can be 
null if not available.
+     */
+    public static CloudBlobClient createCloudBlobClient(ProcessContext 
context, ComponentLog logger, FlowFile flowFile) {
+        final String accountName;
+        final String accountKey;
+        final String sasToken;
+
+        if (flowFile == null) {
+            accountName = 
context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+            accountKey = 
context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+            sasToken = 
context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
+        } else {
+            accountName = 
context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            accountKey = 
context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            sasToken = 
context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+        }
 
         CloudBlobClient cloudBlobClient;
 

Reply via email to