NIFI-1833 - Addressed issues from PR review. NIFI-1833 Moved AbstractListProcessor.java, EntityListing.java, and ListableEntity.java from nifi-standard-processors into nifi-processor-utils Moved TestAbstractListProcessor.java into nifi-processor-utils Set nifi-azure-nar's nar dependency back to nifi-standard-services-api-nar Fixed failing integration tests (ITFetchAzureBlobStorage.java, ITListAzureBlobStorage.java, and ITPutAzureStorageBlob.java) and refactored them to be able to run in parallel
NIFI-1833 Moved security notice info in the additional details documentation into the descriptions of the specific attributes for which those notices are intended Added displayName usage to properties Updated exception handling in FetchAzureBlobStorage.java and PutAzureBlobStorage.java to cause flowfiles with Output/InputStreamCallback failures to be routed to the processor's failure relationship Cleaned up dependencies in pom NIFI-1833 Removed unnecessary calls to map on Optional in the onTrigger exception handling of FetchAzureBlobStorage.java and PutAzureBlobStorage.java NIFI-1833 Updates due to nifi-processor-utils being moved under nifi-nar-bundles This closes #1719. Signed-off-by: Bryan Rosander <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/26d90fbc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/26d90fbc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/26d90fbc Branch: refs/heads/master Commit: 26d90fbccfa2c4c376e663f6be9c12856decc592 Parents: f30c816 Author: Jeff Storck <[email protected]> Authored: Sat Apr 29 20:23:35 2017 -0400 Committer: Bryan Rosander <[email protected]> Committed: Tue May 2 14:39:46 2017 -0400 ---------------------------------------------------------------------- .../nifi-azure-bundle/nifi-azure-nar/pom.xml | 2 +- .../nifi-azure-processors/pom.xml | 33 +- .../azure/AbstractAzureBlobProcessor.java | 4 +- .../azure/AbstractAzureProcessor.java | 8 +- .../nifi/processors/azure/AzureConstants.java | 18 +- .../azure/storage/FetchAzureBlobStorage.java | 19 +- .../azure/storage/ListAzureBlobStorage.java | 18 +- .../azure/storage/PutAzureBlobStorage.java | 17 +- .../azure/storage/utils/BlobInfo.java | 2 +- .../additionalDetails.html | 39 -- .../additionalDetails.html | 39 -- .../additionalDetails.html | 39 -- .../azure/storage/AbstractAzureIT.java | 97 ---- .../processors/azure/storage/AzureTestUtil.java | 76 +++ .../azure/storage/ITFetchAzureBlobStorage.java | 58 +- .../azure/storage/ITListAzureBlobStorage.java | 47 +- .../azure/storage/ITPutAzureStorageBlob.java | 42 +- .../nifi-processor-utils/pom.xml | 9 + .../util/list/AbstractListProcessor.java | 521 ++++++++++++++++++ .../nifi/processor/util/list/EntityListing.java | 71 +++ .../processor/util/list/ListableEntity.java | 40 ++ .../util/list/TestAbstractListProcessor.java | 528 +++++++++++++++++++ .../standard/AbstractListProcessor.java | 523 ------------------ .../nifi/processors/standard/ListFile.java | 1 + .../processors/standard/ListFileTransfer.java | 1 + .../processors/standard/util/EntityListing.java | 71 --- .../nifi/processors/standard/util/FileInfo.java | 2 + .../standard/util/ListableEntity.java | 40 -- .../standard/TestAbstractListProcessor.java | 527 ------------------ .../nifi/processors/standard/TestListFile.java | 1 + pom.xml | 5 + 31 files changed, 1404 insertions(+), 1494 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml index e6c3c9b..f75bb7f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml @@ -38,7 +38,7 @@ <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-standard-nar</artifactId> + <artifactId>nifi-standard-services-api-nar</artifactId> <type>nar</type> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 9b4f28b..6133e30 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -19,9 +19,6 @@ </parent> <artifactId>nifi-azure-processors</artifactId> <packaging>jar</packaging> - <properties> - <powermock.version>1.6.5</powermock.version> - </properties> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> @@ -32,24 +29,10 @@ <artifactId>nifi-utils</artifactId> </dependency> <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-eventhubs</artifactId> <version>0.9.0</version> </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <version>2.8.6</version> - </dependency> <!--<dependency> <groupId>com.microsoft.eventhubs.client</groupId> <artifactId>eventhubs-client</artifactId> @@ -66,26 +49,18 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> </dependency> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-standard-processors</artifactId> - <version>${project.version}</version> - <scope>provided</scope> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + <scope>test</scope> </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java index 2026711..f0729e6 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java @@ -25,8 +25,8 @@ import java.util.List; public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor { - public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build(); + public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("blob").displayName("Blob").description("The filename of the blob") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build(); private static final List<PropertyDescriptor> PROPERTIES = Collections .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java index c95ee99..5812236 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java @@ -34,7 +34,7 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build(); public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build(); - public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); protected CloudStorageAccount createStorageConnection(ProcessContext context) { final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); @@ -67,11 +67,7 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor { */ private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { CloudStorageAccount storageAccount; - try { - storageAccount = CloudStorageAccount.parse(storageConnectionString); - } catch (IllegalArgumentException | URISyntaxException | InvalidKeyException e) { - throw e; - } + storageAccount = CloudStorageAccount.parse(storageConnectionString); return storageAccount; } http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java index 9a51030..1e0cde3 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java @@ -23,14 +23,24 @@ public final class AzureConstants { public static final String BLOCK = "Block"; public static final String PAGE = "Page"; - public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The storage account key") + public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key") + .description("The storage account key. There are certain risks in allowing the account key to be stored as a flowfile" + + "attribute. While it does provide for a more flexible flow by allowing the account key to " + + "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); - public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("Storage Account Name").description("The storage account name") + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name") + .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile" + + "attribute. While it does provide for a more flexible flow by allowing the account name to " + + "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); - public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); + public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("container-name").displayName("Container name") + .description("Name of the azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); // use HTTPS by default as per MSFT recommendation public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java index 163a962..cd08ec1 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -23,11 +23,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -75,6 +77,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + AtomicReference<Exception> storedException = new AtomicReference<>(); try { CloudStorageAccount storageAccount = createStorageConnection(context, flowFile); CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); @@ -89,6 +92,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { try { blob.download(os); } catch (StorageException e) { + storedException.set(e); throw new IOException(e); } }); @@ -103,10 +107,15 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis); - - } catch (IllegalArgumentException | URISyntaxException | StorageException e1) { - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); + } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) { + if (e instanceof ProcessException && storedException.get() == null) { + throw (ProcessException) e; + } else { + Exception failureException = Optional.ofNullable(storedException.get()).orElse(e); + getLogger().error("Failure to fetch Azure blob {}", new Object[]{blobPath}, failureException); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index f8a6c4d..148f724 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -40,10 +40,10 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.processors.azure.storage.utils.BlobInfo; import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; -import org.apache.nifi.processors.standard.AbstractListProcessor; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageException; @@ -59,7 +59,9 @@ import com.microsoft.azure.storage.blob.ListBlobItem; @TriggerSerially @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) @SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class }) -@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage") +@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage. " + + "This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " + + "previous node left off without duplicating all of the data.") @InputRequirement(Requirement.INPUT_FORBIDDEN) @WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"), @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"), @@ -71,12 +73,14 @@ import com.microsoft.azure.storage.blob.ListBlobItem; @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"), @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") }) -@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. " - + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.") +@Stateful(scopes = { Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. " + + "This allows the Processor to list only blobs that have been added or modified after this date the next time that the Processor is run. State is " + + "stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " + + "where the previous node left off, without duplicating the data.") public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { - private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true).required(false).build(); + private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("prefix").displayName("Prefix").description("Search prefix for listing") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).build(); private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX)); @@ -155,7 +159,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { return listing; } - protected CloudStorageAccount createStorageConnection(ProcessContext context) { + private CloudStorageAccount createStorageConnection(ProcessContext context) { final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey); http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index e03bc25..e164635 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -22,7 +22,9 @@ import java.io.InputStream; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -69,6 +71,7 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + AtomicReference<Exception> storedException = new AtomicReference<>(); try { CloudStorageAccount storageAccount = createStorageConnection(context, flowFile); CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); @@ -94,6 +97,7 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { attributes.put("azure.length", String.valueOf(length)); attributes.put("azure.timestamp", String.valueOf(properties.getLastModified())); } catch (StorageException | URISyntaxException e) { + storedException.set(e); throw new IOException(e); } }); @@ -106,10 +110,15 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis); - } catch (IllegalArgumentException | URISyntaxException | StorageException e) { - getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, e); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); + } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) { + if (e instanceof ProcessException && storedException.get() == null) { + throw (ProcessException) e; + } else { + Exception failureException = Optional.ofNullable(storedException.get()).orElse(e); + getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, failureException); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java index 6907d94..28a47ea 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java @@ -18,7 +18,7 @@ package org.apache.nifi.processors.azure.storage.utils; import java.io.Serializable; -import org.apache.nifi.processors.standard.util.ListableEntity; +import org.apache.nifi.processor.util.list.ListableEntity; public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEntity { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html deleted file mode 100644 index b4b8e3b..0000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html +++ /dev/null @@ -1,39 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<head> - <meta charset="utf-8" /> - <title>FetchAzureBlobStorage Processor</title> - <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> -</head> - -<body> - -<h2>Apache NiFi Azure Processors</h2> - -<h3>Important Security Note</h3> -<p> - There are certain risks in allowing the account name and key to be stored as flowfile - attributes. While it does provide for a more flexible flow by allowing the account name and key - be fetched dynamically from the flow file attributes, care must be taken to restrict access to - the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). - In addition, the provenance repositories may be put on encrypted disk partitions. -</p> -<p> - <a href="#" onclick="history.back()">Return to a previous page</a> -</p> -</body> -</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html deleted file mode 100644 index 76e8775..0000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html +++ /dev/null @@ -1,39 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<head> - <meta charset="utf-8" /> - <title>ListAzureBlobStorage Processor</title> - <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> -</head> - -<body> - -<h2>Apache NiFi Azure Processors</h2> - -<h3>Important Security Note</h3> -<p> - There are certain risks in allowing the account name and key to be stored as flowfile - attributes. While it does provide for a more flexible flow by allowing the account name and key - be fetched dynamically from the flow file attributes, care must be taken to restrict access to - the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). - In addition, the provenance repositories may be put on encrypted disk partitions. -</p> -<p> - <a href="#" onclick="history.back()">Return to a previous page</a> -</p> -</body> -</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html deleted file mode 100644 index 0a7ff35..0000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html +++ /dev/null @@ -1,39 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<head> - <meta charset="utf-8" /> - <title>PutAzureBlobStorage Processor</title> - <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> -</head> - -<body> - -<h2>Apache NiFi Azure Processors</h2> - -<h3>Important Security Note</h3> -<p> - There are certain risks in allowing the account name and key to be stored as flowfile - attributes. While it does provide for a more flexible flow by allowing the account name and key - be fetched dynamically from the flow file attributes, care must be taken to restrict access to - the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). - In addition, the provenance repositories may be put on encrypted disk partitions. -</p> -<p> - <a href="#" onclick="history.back()">Return to a previous page</a> -</p> -</body> -</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java deleted file mode 100644 index 91a8c73..0000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; -import com.microsoft.azure.storage.blob.ListBlobItem; -import org.apache.nifi.processors.azure.AzureConstants; -import org.apache.nifi.util.file.FileUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.security.InvalidKeyException; -import java.util.Properties; - -import static org.junit.Assert.fail; - -public abstract class AbstractAzureIT { - protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; - public static final String TEST_CONTAINER_NAME = "nifitest"; - - private static final Properties CONFIG; - protected static final String TEST_BLOB_NAME = "testing"; - protected static final String TEST_TABLE_NAME = "testing"; - - static { - final FileInputStream fis; - CONFIG = new Properties(); - try { - fis = new FileInputStream(CREDENTIALS_FILE); - try { - CONFIG.load(fis); - } catch (IOException e) { - fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); - } finally { - FileUtils.closeQuietly(fis); - } - } catch (FileNotFoundException e) { - fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); - } - - } - - @BeforeClass - public static void oneTimeSetup() throws StorageException, InvalidKeyException, URISyntaxException { - CloudBlobContainer container = getContainer(); - container.createIfNotExists(); - } - - @AfterClass - public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException { - CloudBlobContainer container = getContainer(); - for (ListBlobItem blob : container.listBlobs()) { - if (blob instanceof CloudBlob) { - ((CloudBlob) blob).delete(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS, null, null, null); - } - } - } - - public static String getAccountName() { - return CONFIG.getProperty("accountName"); - } - - public static String getAccountKey() { - return CONFIG.getProperty("accountKey"); - } - - protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException { - String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey()); - CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); - CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); - return blobClient.getContainerReference(TEST_CONTAINER_NAME); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java new file mode 100644 index 0000000..636845c --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import static org.junit.Assert.fail; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Properties; + +import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.util.file.FileUtils; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; + +class AzureTestUtil { + private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; + static final String TEST_CONTAINER_NAME_PREFIX = "nifitest"; + + private static final Properties CONFIG; + static final String TEST_BLOB_NAME = "testing"; + + static { + final FileInputStream fis; + CONFIG = new Properties(); + try { + fis = new FileInputStream(CREDENTIALS_FILE); + try { + CONFIG.load(fis); + } catch (IOException e) { + fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); + } finally { + FileUtils.closeQuietly(fis); + } + } catch (FileNotFoundException e) { + fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); + } + + } + + static String getAccountName() { + return CONFIG.getProperty("accountName"); + } + + static String getAccountKey() { + return CONFIG.getProperty("accountKey"); + } + + static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException { + String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey()); + CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); + CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + return blobClient.getContainerReference(containerName); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java index 7dc8830..eeedd3b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java @@ -16,12 +16,15 @@ */ package org.apache.nifi.processors.azure.storage; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URISyntaxException; import java.security.InvalidKeyException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.nifi.processors.azure.AbstractAzureProcessor; import org.apache.nifi.processors.azure.AzureConstants; @@ -31,32 +34,47 @@ import org.apache.nifi.util.TestRunners; import org.junit.Test; import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; -public class ITFetchAzureBlobStorage extends AbstractAzureIT { +public class ITFetchAzureBlobStorage { @Test public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, StorageException, IOException { + String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID()); + CloudBlobContainer container = AzureTestUtil.getContainer(containerName); + container.createIfNotExists(); + + CloudBlob blob = container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME); + byte[] buf = "0123456789".getBytes(); + InputStream in = new ByteArrayInputStream(buf); + blob.upload(in, 10); + final TestRunner runner = TestRunners.newTestRunner(new FetchAzureBlobStorage()); - runner.setValidateExpressionUsage(true); - - runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName()); - runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey()); - runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME); - runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}"); - - final Map<String, String> attributes = new HashMap<>(); - attributes.put("azure.primaryUri", "http://" + getAccountName() + ".blob.core.windows.net/" + TEST_CONTAINER_NAME + "/" + TEST_BLOB_NAME); - attributes.put("azure.blobname", TEST_BLOB_NAME); - attributes.put("azure.blobtype", AzureConstants.BLOCK); - runner.enqueue(new byte[0], attributes); - runner.run(); - - runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1); - List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); - for (MockFlowFile flowFile : flowFilesForRelationship) { - flowFile.assertContentEquals("0123456789".getBytes()); - flowFile.assertAttributeEquals("azure.length", "10"); + try { + runner.setValidateExpressionUsage(true); + + runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(AzureConstants.CONTAINER, containerName); + runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME); + attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME); + attributes.put("azure.blobtype", AzureConstants.BLOCK); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1); + List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + } + } finally { + container.deleteIfExists(); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java index 277538c..6dd9088 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java @@ -21,55 +21,50 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; import java.security.InvalidKeyException; +import java.util.UUID; import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobContainer; -public class ITListAzureBlobStorage extends AbstractAzureIT { +public class ITListAzureBlobStorage { - @BeforeClass - public static void setupSomeFiles() throws InvalidKeyException, URISyntaxException, StorageException, IOException { - CloudBlobContainer container = getContainer(); + @Test + public void testListsAzureBlobStorageContent() throws InvalidKeyException, StorageException, URISyntaxException, IOException { + String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID()); + CloudBlobContainer container = AzureTestUtil.getContainer(containerName); container.createIfNotExists(); - CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME); + CloudBlob blob = container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME); byte[] buf = "0123456789".getBytes(); InputStream in = new ByteArrayInputStream(buf); blob.upload(in, 10); - } - @AfterClass - public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException { - CloudBlobContainer container = getContainer(); - container.deleteIfExists(); - } - - @Test - public void testListsAzureBlobStorageContent() { final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage()); - runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName()); - runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey()); - runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME); + try { + runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(AzureConstants.CONTAINER, containerName); - // requires multiple runs to deal with List processor checking - runner.run(3); + // requires multiple runs to deal with List processor checking + runner.run(3); - runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1); - runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1); + runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1); - for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) { - entry.assertAttributeEquals("azure.length", "10"); - entry.assertAttributeEquals("mime.type", "application/octet-stream"); + for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) { + entry.assertAttributeEquals("azure.length", "10"); + entry.assertAttributeEquals("mime.type", "application/octet-stream"); + } + } finally { + container.deleteIfExists(); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java index 0308add..5cecdbc 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java @@ -17,7 +17,10 @@ package org.apache.nifi.processors.azure.storage; import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; import java.util.List; +import java.util.UUID; import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.util.MockFlowFile; @@ -25,27 +28,38 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -public class ITPutAzureStorageBlob extends AbstractAzureIT { +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; + +public class ITPutAzureStorageBlob { @Test - public void testPuttingBlob() throws IOException { + public void testPuttingBlob() throws IOException, InvalidKeyException, StorageException, URISyntaxException { + String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID()); + CloudBlobContainer container = AzureTestUtil.getContainer(containerName); + container.createIfNotExists(); + final TestRunner runner = TestRunners.newTestRunner(new PutAzureBlobStorage()); - runner.setValidateExpressionUsage(true); + try { + runner.setValidateExpressionUsage(true); - runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName()); - runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey()); - runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME); - runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload"); + runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(AzureConstants.CONTAINER, containerName); + runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload"); - runner.enqueue("0123456789".getBytes()); - runner.run(); + runner.enqueue("0123456789".getBytes()); + runner.run(); - runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1); - List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS); - for (MockFlowFile flowFile : flowFilesForRelationship) { - flowFile.assertContentEquals("0123456789".getBytes()); - flowFile.assertAttributeEquals("azure.length", "10"); + runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1); + List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + } + } finally { + container.deleteIfExists(); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml index f7bcb43..711ddae 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml @@ -55,6 +55,15 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java new file mode 100644 index 0000000..2666e2c --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processor.util.list; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * <p> + * An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources. + * Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that + * we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor. + * </p> + * <p> + * This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities + * or entities that have been modified will be emitted from the Processor. + * </p> + * <p> + * In order to make use of this abstract class, the entities listed must meet the following criteria: + * </p> + * <ul> + * <li> + * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is + * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled. + * </li> + * <li> + * If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later + * than the last timestamp pulled, then the entity is considered new. + * </li> + * <li> + * Entity must have a user-readable name that can be used for logging purposes. + * </li> + * </ul> + * <p> + * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is + * performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp + * that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to + * determine new entities. + * </p> + * <p> + * NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache + * Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged. + * </p> + * <p> + * For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set + * of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for + * the configured dataflow. + * </p> + * <p> + * Subclasses are responsible for the following: + * </p> + * <ul> + * <li> + * Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all + * entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those + * entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability + * to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation. + * </li> + * <li> + * Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the + * {@link #createAttributes(ListableEntity, ProcessContext)}. + * </li> + * <li> + * Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only + * within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept + * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method. + * </li> + * <li> + * Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user + * changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning + * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared. + * </li> + * </ul> + */ +@TriggerSerially +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. " + + "The scope used depends on the implementation.") +public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor { + + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("Distributed Cache Service") + .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node " + + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. " + + "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.") + .required(false) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are received are routed to success") + .build(); + + private volatile Long lastListingTime = null; + private volatile Long lastProcessedTime = 0L; + private volatile Long lastRunTime = 0L; + private volatile boolean justElectedPrimaryNode = false; + private volatile boolean resetState = false; + + /* + * A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest + * files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled + * near instantaneously after the prior iteration effectively voiding the built in buffer + */ + public static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L); + static final String LISTING_TIMESTAMP_KEY = "listing.timestamp"; + static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp"; + + public File getPersistenceFile() { + return new File("conf/state/" + getIdentifier()); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DISTRIBUTED_CACHE_SERVICE); + return properties; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (isConfigurationRestored() && isListingResetNecessary(descriptor)) { + resetTimeStates(); // clear lastListingTime so that we have to fetch new time + resetState = true; + } + } + + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + return relationships; + } + + @OnPrimaryNodeStateChange + public void onPrimaryNodeChange(final PrimaryNodeState newState) { + justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE); + } + + @OnScheduled + public final void updateState(final ProcessContext context) throws IOException { + final String path = getPath(context); + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + + // Check if state already exists for this path. If so, we have already migrated the state. + final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); + if (stateMap.getVersion() == -1L) { + try { + // Migrate state from the old way of managing state (distributed cache service and local file) + // to the new mechanism (State Manager). + migrateState(path, client, context.getStateManager(), getStateScope(context)); + } catch (final IOException ioe) { + throw new IOException("Failed to properly migrate state to State Manager", ioe); + } + } + + // When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp + if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) { + getLogger().info("Detected that state was cleared for this component. Resetting internal values."); + resetTimeStates(); + } + + if (resetState) { + context.getStateManager().clear(getStateScope(context)); + resetState = false; + } + } + + /** + * This processor used to use the DistributedMapCacheClient in order to store cluster-wide state, before the introduction of + * the StateManager. This method will migrate state from that DistributedMapCacheClient, or from a local file, to the StateManager, + * if any state already exists. More specifically, this will extract out the relevant timestamp for when the processor last ran + * + * @param path the path to migrate state for + * @param client the DistributedMapCacheClient that is capable of obtaining the current state + * @param stateManager the StateManager to use in order to store the new state + * @param scope the scope to use + * @throws IOException if unable to retrieve or store the state + */ + private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException { + Long minTimestamp = null; + + // Retrieve state from Distributed Cache Client, establishing the latest file seen + if (client != null) { + final StringSerDe serde = new StringSerDe(); + final String serializedState = client.get(getKey(path), serde, serde); + if (serializedState != null && !serializedState.isEmpty()) { + final EntityListing listing = deserialize(serializedState); + minTimestamp = listing.getLatestTimestamp().getTime(); + } + + // remove entry from distributed cache server + if (client != null) { + try { + client.remove(path, new StringSerDe()); + } catch (final IOException ioe) { + getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new " + + "State Management service, so the Distributed Cache Service is no longer needed."); + } + } + } + + // Retrieve state from locally persisted file, and compare these to the minTimestamp established from the distributedCache, if there was one + final File persistenceFile = getPersistenceFile(); + if (persistenceFile.exists()) { + final Properties props = new Properties(); + + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + props.load(fis); + } + + final String locallyPersistedValue = props.getProperty(path); + if (locallyPersistedValue != null) { + final EntityListing listing = deserialize(locallyPersistedValue); + final long localTimestamp = listing.getLatestTimestamp().getTime(); + // if the local file's latest timestamp is beyond that of the value provided from the cache, replace + if (minTimestamp == null || localTimestamp > minTimestamp) { + minTimestamp = localTimestamp; + } + } + + // delete the local file, since it is no longer needed + if (persistenceFile.exists() && !persistenceFile.delete()) { + getLogger().warn("Migrated state but failed to delete local persistence file"); + } + } + + if (minTimestamp != null) { + persist(minTimestamp, minTimestamp, stateManager, scope); + } + } + + private void persist(final long listingTimestamp, final long processedTimestamp, final StateManager stateManager, final Scope scope) throws IOException { + final Map<String, String> updatedState = new HashMap<>(1); + updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp)); + updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp)); + stateManager.setState(updatedState, scope); + } + + protected String getKey(final String directory) { + return getIdentifier() + ".lastListingTime." + directory; + } + + private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(serializedState); + return mapper.readValue(jsonNode, EntityListing.class); + } + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + Long minTimestamp = lastListingTime; + + if (this.lastListingTime == null || this.lastProcessedTime == null || justElectedPrimaryNode) { + try { + // Attempt to retrieve state from the state manager if a last listing was not yet established or + // if just elected the primary node + final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); + final String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY); + final String lastProcessedString= stateMap.get(PROCESSED_TIMESTAMP_KEY); + if (lastProcessedString != null) { + this.lastProcessedTime = Long.parseLong(lastProcessedString); + } + if (listingTimestampString != null) { + minTimestamp = Long.parseLong(listingTimestampString); + // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates + if (minTimestamp == this.lastListingTime) { + context.yield(); + return; + } else { + this.lastListingTime = minTimestamp; + } + } + justElectedPrimaryNode = false; + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + } + + final List<T> entityList; + final long currentListingTimestamp = System.nanoTime(); + try { + // track of when this last executed for consideration of the lag nanos + entityList = performListing(context, minTimestamp); + } catch (final IOException e) { + getLogger().error("Failed to perform listing on remote host due to {}", e); + context.yield(); + return; + } + + if (entityList == null || entityList.isEmpty()) { + context.yield(); + return; + } + + Long latestListingTimestamp = null; + final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); + + // Build a sorted map to determine the latest possible entries + for (final T entity : entityList) { + final long entityTimestamp = entity.getTimestamp(); + // New entries are all those that occur at or after the associated timestamp + final boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > lastProcessedTime; + + if (newEntry) { + List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp()); + if (entitiesForTimestamp == null) { + entitiesForTimestamp = new ArrayList<T>(); + orderedEntries.put(entity.getTimestamp(), entitiesForTimestamp); + } + entitiesForTimestamp.add(entity); + } + } + + int flowfilesCreated = 0; + + if (orderedEntries.size() > 0) { + latestListingTimestamp = orderedEntries.lastKey(); + + // If the last listing time is equal to the newest entries previously seen, + // another iteration has occurred without new files and special handling is needed to avoid starvation + if (latestListingTimestamp.equals(lastListingTime)) { + /* We are done when either: + * - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run + * - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over + */ + if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(lastProcessedTime)) { + context.yield(); + return; + } + + } else if (latestListingTimestamp >= currentListingTimestamp - LISTING_LAG_NANOS) { + // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data + orderedEntries.remove(latestListingTimestamp); + } + + for (List<T> timestampEntities : orderedEntries.values()) { + for (T entity : timestampEntities) { + // Create the FlowFile for this path. + final Map<String, String> attributes = createAttributes(entity, context); + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + flowfilesCreated++; + } + } + } + + // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated + if (latestListingTimestamp != null) { + boolean processedNewFiles = flowfilesCreated > 0; + if (processedNewFiles) { + // If there have been files created, update the last timestamp we processed + lastProcessedTime = orderedEntries.lastKey(); + getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated}); + session.commit(); + } + + lastRunTime = System.nanoTime(); + + if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) { + // We have performed a listing and pushed any FlowFiles out that may have been generated + // Now, we need to persist state about the Last Modified timestamp of the newest file + // that we evaluated. We do this in order to avoid pulling in the same file twice. + // However, we want to save the state both locally and remotely. + // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the + // previously Primary Node left off. + // We also store the state locally so that if the node is restarted, and the node cannot contact + // the distributed state cache, the node can continue to run (if it is primary node). + try { + lastListingTime = latestListingTimestamp; + persist(latestListingTimestamp, lastProcessedTime, context.getStateManager(), getStateScope(context)); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " + + "if another node begins executing this Processor, data duplication may occur.", ioe); + } + } + + } else { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); + + // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system + if (lastListingTime == null) { + lastListingTime = 0L; + } + + return; + } + } + + private void resetTimeStates() { + lastListingTime = null; + lastProcessedTime = 0L; + lastRunTime = 0L; + } + + /** + * Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity + * (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no + * content. The attributes that will be included are exactly the attributes that are returned by this method. + * + * @param entity the entity represented by the FlowFile + * @param context the ProcessContext for obtaining configuration information + * @return a Map of attributes for this entity + */ + protected abstract Map<String, String> createAttributes(T entity, ProcessContext context); + + /** + * Returns the path to perform a listing on. + * Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only + * within that path. This method is responsible for returning the path that is currently being polled for entities. If this does concept + * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method. + * + * @param context the ProcessContex to use in order to obtain configuration + * @return the path that is to be used to perform the listing, or <code>null</code> if not applicable. + */ + protected abstract String getPath(final ProcessContext context); + + /** + * Performs a listing of the remote entities that can be pulled. If any entity that is returned has already been "discovered" or "emitted" + * by this Processor, it will be ignored. A discussion of how the Processor determines those entities that have already been emitted is + * provided above in the documentation for this class. Any entity that is returned by this method with a timestamp prior to the minTimestamp + * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient + * if the filtering can be performed on the server side prior to retrieving the information. + * + * @param context the ProcessContex to use in order to pull the appropriate entities + * @param minTimestamp the minimum timestamp of entities that should be returned. + * @return a Listing of entities that have a timestamp >= minTimestamp + */ + protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException; + + /** + * Determines whether or not the listing must be reset if the value of the given property is changed + * + * @param property the property that has changed + * @return <code>true</code> if a change in value of the given property necessitates that the listing be reset, <code>false</code> otherwise. + */ + protected abstract boolean isListingResetNecessary(final PropertyDescriptor property); + + /** + * Returns a Scope that specifies where the state should be managed for this Processor + * + * @param context the ProcessContext to use in order to make a determination + * @return a Scope that specifies where the state should be managed for this Processor + */ + protected abstract Scope getStateScope(final ProcessContext context); + + + private static class StringSerDe implements Serializer<String>, Deserializer<String> { + @Override + public String deserialize(final byte[] value) throws DeserializationException, IOException { + if (value == null) { + return null; + } + + return new String(value, StandardCharsets.UTF_8); + } + + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + } +}
