NIFI-4005: Azure Blob Storage SAS support, incorporating review comments. This closes #2353
- Renamed Azure to AzureStorageUtils. - Fixed whitespacing in property description. - Renamed SAS String to SAS Token. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1ee8d16a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1ee8d16a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1ee8d16a Branch: refs/heads/master Commit: 1ee8d16a211c247d1d3d7f1189be36a866d0ee98 Parents: 17ddaf6 Author: Koji Kawamura <[email protected]> Authored: Tue Dec 19 11:35:00 2017 +0900 Committer: Matt Gilman <[email protected]> Committed: Tue Dec 19 10:34:54 2017 -0500 ---------------------------------------------------------------------- .../azure/AbstractAzureBlobProcessor.java | 12 +- .../azure/storage/FetchAzureBlobStorage.java | 6 +- .../azure/storage/ListAzureBlobStorage.java | 28 ++--- .../azure/storage/PutAzureBlobStorage.java | 6 +- .../processors/azure/storage/utils/Azure.java | 124 ------------------ .../azure/storage/utils/AzureStorageUtils.java | 125 +++++++++++++++++++ .../processors/azure/storage/AzureTestUtil.java | 4 +- .../azure/storage/ITFetchAzureBlobStorage.java | 10 +- .../azure/storage/ITListAzureBlobStorage.java | 8 +- .../azure/storage/ITPutAzureStorageBlob.java | 8 +- 10 files changed, 166 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java index 9bb3d33..cab0ed9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java @@ -22,7 +22,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.azure.storage.utils.Azure; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import java.util.Arrays; import java.util.Collection; @@ -40,10 +40,10 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor { private static final List<PropertyDescriptor> PROPERTIES = Collections .unmodifiableList(Arrays.asList( - Azure.CONTAINER, - Azure.PROP_SAS_TOKEN, - Azure.ACCOUNT_NAME, - Azure.ACCOUNT_KEY, + AzureStorageUtils.CONTAINER, + AzureStorageUtils.PROP_SAS_TOKEN, + AzureStorageUtils.ACCOUNT_NAME, + AzureStorageUtils.ACCOUNT_KEY, BLOB)); private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( @@ -58,7 +58,7 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor { @Override protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - return Azure.validateCredentialProperties(validationContext); + return AzureStorageUtils.validateCredentialProperties(validationContext); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java index f484f1a..d960f7d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -36,7 +36,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.storage.utils.Azure; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; @@ -61,12 +61,12 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { final long startNanos = System.nanoTime(); - String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); AtomicReference<Exception> storedException = new AtomicReference<>(); try { - CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger()); + CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger()); CloudBlobContainer container = blobClient.getContainerReference(containerName); final Map<String, String> attributes = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index 18669a1..7024e22 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -41,7 +41,7 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.list.AbstractListProcessor; -import org.apache.nifi.processors.azure.storage.utils.Azure; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.BlobInfo; import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; @@ -83,10 +83,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( - Azure.CONTAINER, - Azure.PROP_SAS_TOKEN, - Azure.ACCOUNT_NAME, - Azure.ACCOUNT_KEY, + AzureStorageUtils.CONTAINER, + AzureStorageUtils.PROP_SAS_TOKEN, + AzureStorageUtils.ACCOUNT_NAME, + AzureStorageUtils.ACCOUNT_KEY, PROP_PREFIX)); @Override @@ -96,7 +96,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { @Override protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - return Azure.validateCredentialProperties(validationContext); + return AzureStorageUtils.validateCredentialProperties(validationContext); } @Override @@ -117,16 +117,16 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { @Override protected String getPath(final ProcessContext context) { - return context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions().getValue(); + return context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue(); } @Override protected boolean isListingResetNecessary(final PropertyDescriptor property) { // re-list if configuration changed, but not when security keys are rolled (not included in the condition) return PROP_PREFIX.equals(property) - || Azure.ACCOUNT_NAME.equals(property) - || Azure.CONTAINER.equals(property) - || Azure.PROP_SAS_TOKEN.equals(property); + || AzureStorageUtils.ACCOUNT_NAME.equals(property) + || AzureStorageUtils.CONTAINER.equals(property) + || AzureStorageUtils.PROP_SAS_TOKEN.equals(property); } @Override @@ -136,14 +136,14 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { @Override protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { - String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions().getValue(); + String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue(); String prefix = context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue(); if (prefix == null) { prefix = ""; } final List<BlobInfo> listing = new ArrayList<>(); try { - CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger()); + CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger()); CloudBlobContainer container = blobClient.getContainerReference(containerName); for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { @@ -165,9 +165,9 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { } if (blob instanceof CloudBlockBlob) { - builder.blobType(Azure.BLOCK); + builder.blobType(AzureStorageUtils.BLOCK); } else { - builder.blobType(Azure.PAGE); + builder.blobType(AzureStorageUtils.PAGE); } listing.add(builder.build()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index 65f8f2f..a316256 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -38,7 +38,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.storage.utils.Azure; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobProperties; @@ -66,13 +66,13 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { final long startNanos = System.nanoTime(); - String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); AtomicReference<Exception> storedException = new AtomicReference<>(); try { - CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger()); + CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger()); CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlob blob = container.getBlockBlobReference(blobPath); http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java deleted file mode 100644 index 4a734ab..0000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage.utils; - -import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.StorageCredentials; -import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; - -import java.net.URI; -import java.net.URISyntaxException; -import java.security.InvalidKeyException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -public final class Azure { - public static final String BLOCK = "Block"; - public static final String PAGE = "Page"; - - public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key") - .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + - "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " + - "There are certain risks in allowing the account key to be stored as a flowfile" + - "attribute. While it does provide for a more flexible flow by allowing the account key to " + - "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + - "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + - "In addition, the provenance repositories may be put on encrypted disk partitions.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).sensitive(true).build(); - - public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name") - .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile" + - "attribute. While it does provide for a more flexible flow by allowing the account name to " + - "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + - "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + - "In addition, the provenance repositories may be put on encrypted disk partitions.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); - - public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("container-name").displayName("Container Name") - .description("Name of the Azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); - - public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder() - .name("SAS String") - .description("Shared Access Signature string, including the leading '?'. Specify either SAS (recommended) or Account Key") - .required(false) - .expressionLanguageSupported(true) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - // use HTTPS by default as per MSFT recommendation - public static final String FORMAT_BLOB_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; - public static final String FORMAT_BASE_URI = "https://%s.blob.core.windows.net"; - - private Azure() { - // do not instantiate - } - - public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger) { - final String accountName = context.getProperty(Azure.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); - final String accountKey = context.getProperty(Azure.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); - final String sasToken = context.getProperty(Azure.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue(); - - CloudBlobClient cloudBlobClient; - - try { - // sas token and acct name/key have different ways of creating a secure connection (e.g. new StorageCredentialsAccountAndKey didn't work) - if (StringUtils.isNotBlank(sasToken)) { - String storageConnectionString = String.format(Azure.FORMAT_BASE_URI, accountName); - StorageCredentials creds = new StorageCredentialsSharedAccessSignature(sasToken); - cloudBlobClient = new CloudBlobClient(new URI(storageConnectionString), creds); - } else { - String blobConnString = String.format(Azure.FORMAT_BLOB_CONNECTION_STRING, accountName, accountKey); - CloudStorageAccount storageAccount = CloudStorageAccount.parse(blobConnString); - cloudBlobClient = storageAccount.createCloudBlobClient(); - } - } catch (IllegalArgumentException | URISyntaxException e) { - logger.error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e); - throw new IllegalArgumentException(e); - } catch (InvalidKeyException e) { - logger.error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e); - throw new IllegalArgumentException(e); - } - - return cloudBlobClient; - } - - public static Collection<ValidationResult> validateCredentialProperties(ValidationContext validationContext) { - final List<ValidationResult> results = new ArrayList<>(); - - String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue(); - String acctName = validationContext.getProperty(ACCOUNT_KEY).getValue(); - if ((StringUtils.isBlank(sasToken) && StringUtils.isBlank(acctName)) - || (StringUtils.isNotBlank(sasToken) && StringUtils.isNotBlank(acctName))) { - results.add(new ValidationResult.Builder().subject("Azure Credentials") - .valid(false) - .explanation("either Azure Account Key or Shared Access Signature required, but not both") - .build()); - } - - return results; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java new file mode 100644 index 0000000..360afbf --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public final class AzureStorageUtils { + public static final String BLOCK = "Block"; + public static final String PAGE = "Page"; + + public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key") + .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + + "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " + + "There are certain risks in allowing the account key to be stored as a flowfile" + + "attribute. While it does provide for a more flexible flow by allowing the account key to " + + "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).sensitive(true).build(); + + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name") + .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " + + "attribute. While it does provide for a more flexible flow by allowing the account name to " + + "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); + + public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("container-name").displayName("Container Name") + .description("Name of the Azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); + + public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder() + .name("storage-sas-token") + .displayName("SAS Token") + .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key") + .required(false) + .expressionLanguageSupported(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + // use HTTPS by default as per MSFT recommendation + public static final String FORMAT_BLOB_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; + public static final String FORMAT_BASE_URI = "https://%s.blob.core.windows.net"; + + private AzureStorageUtils() { + // do not instantiate + } + + public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger) { + final String accountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); + final String accountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); + final String sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue(); + + CloudBlobClient cloudBlobClient; + + try { + // sas token and acct name/key have different ways of creating a secure connection (e.g. new StorageCredentialsAccountAndKey didn't work) + if (StringUtils.isNotBlank(sasToken)) { + String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BASE_URI, accountName); + StorageCredentials creds = new StorageCredentialsSharedAccessSignature(sasToken); + cloudBlobClient = new CloudBlobClient(new URI(storageConnectionString), creds); + } else { + String blobConnString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, accountName, accountKey); + CloudStorageAccount storageAccount = CloudStorageAccount.parse(blobConnString); + cloudBlobClient = storageAccount.createCloudBlobClient(); + } + } catch (IllegalArgumentException | URISyntaxException e) { + logger.error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e); + throw new IllegalArgumentException(e); + } catch (InvalidKeyException e) { + logger.error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e); + throw new IllegalArgumentException(e); + } + + return cloudBlobClient; + } + + public static Collection<ValidationResult> validateCredentialProperties(ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + + String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue(); + String acctName = validationContext.getProperty(ACCOUNT_KEY).getValue(); + if ((StringUtils.isBlank(sasToken) && StringUtils.isBlank(acctName)) + || (StringUtils.isNotBlank(sasToken) && StringUtils.isNotBlank(acctName))) { + results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials") + .valid(false) + .explanation("either Azure Account Key or Shared Access Signature required, but not both") + .build()); + } + + return results; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java index b665eff..10bf59d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java @@ -25,7 +25,7 @@ import java.net.URISyntaxException; import java.security.InvalidKeyException; import java.util.Properties; -import org.apache.nifi.processors.azure.storage.utils.Azure; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.util.file.FileUtils; import com.microsoft.azure.storage.CloudStorageAccount; @@ -67,7 +67,7 @@ class AzureTestUtil { } static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException { - String storageConnectionString = String.format(Azure.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey()); + String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey()); CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); return blobClient.getContainerReference(containerName); http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java index 4da7106..a9a2487 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.UUID; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.storage.utils.Azure; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -55,15 +55,15 @@ public class ITFetchAzureBlobStorage { try { runner.setValidateExpressionUsage(true); - runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName()); - runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); - runner.setProperty(Azure.CONTAINER, containerName); + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(AzureStorageUtils.CONTAINER, containerName); runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}"); final Map<String, String> attributes = new HashMap<>(); attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME); attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME); - attributes.put("azure.blobtype", Azure.BLOCK); + attributes.put("azure.blobtype", AzureStorageUtils.BLOCK); runner.enqueue(new byte[0], attributes); runner.run(); http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java index 1bc788a..311cf71 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java @@ -23,7 +23,7 @@ import java.net.URISyntaxException; import java.security.InvalidKeyException; import java.util.UUID; -import org.apache.nifi.processors.azure.storage.utils.Azure; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -49,9 +49,9 @@ public class ITListAzureBlobStorage { final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage()); try { - runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName()); - runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); - runner.setProperty(Azure.CONTAINER, containerName); + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(AzureStorageUtils.CONTAINER, containerName); // requires multiple runs to deal with List processor checking runner.run(3); http://git-wip-us.apache.org/repos/asf/nifi/blob/1ee8d16a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java index 43046db..8b15d85 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java @@ -22,7 +22,7 @@ import java.security.InvalidKeyException; import java.util.List; import java.util.UUID; -import org.apache.nifi.processors.azure.storage.utils.Azure; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -44,9 +44,9 @@ public class ITPutAzureStorageBlob { try { runner.setValidateExpressionUsage(true); - runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName()); - runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); - runner.setProperty(Azure.CONTAINER, containerName); + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(AzureStorageUtils.CONTAINER, containerName); runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload"); runner.enqueue("0123456789".getBytes());
