Repository: nifi Updated Branches: refs/heads/master e30a21cfc -> 0229a5c10
NIFI-5698: Fixed DeleteAzureBlobStorage bug This closes #3073. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0229a5c1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0229a5c1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0229a5c1 Branch: refs/heads/master Commit: 0229a5c10953b0b003e3613e4a172a1641c6452d Parents: e30a21c Author: zenfenan <[email protected]> Authored: Sun Oct 14 13:18:25 2018 +0530 Committer: Koji Kawamura <[email protected]> Committed: Mon Oct 15 10:27:15 2018 +0900 ---------------------------------------------------------------------- .../azure/storage/DeleteAzureBlobStorage.java | 39 +++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0229a5c1/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java index a3f66d8..603bc69 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java @@ -21,28 +21,56 @@ import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; - @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) @SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, PutAzureBlobStorage.class}) @CapabilityDescription("Deletes the provided blob from Azure Storage") @InputRequirement(Requirement.INPUT_REQUIRED) public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor { + private static final AllowableValue DELETE_SNAPSHOTS_NONE = new AllowableValue(DeleteSnapshotsOption.NONE.name(), "None", "Delete the blob only."); + + private static final AllowableValue DELETE_SNAPSHOTS_ALSO = new AllowableValue(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS.name(), "Include Snapshots", "Delete the blob and its snapshots."); + + private static final AllowableValue DELETE_SNAPSHOTS_ONLY = new AllowableValue(DeleteSnapshotsOption.DELETE_SNAPSHOTS_ONLY.name(), "Delete Snapshots Only", "Delete only the blob's snapshots."); + + private static final PropertyDescriptor DELETE_SNAPSHOTS_OPTION = new PropertyDescriptor.Builder() + .name("delete-snapshots-option") + .displayName("Delete Snapshots Option") + .description("Specifies the snapshot deletion options to be used when deleting a blob.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(DELETE_SNAPSHOTS_NONE, DELETE_SNAPSHOTS_ALSO, DELETE_SNAPSHOTS_ONLY) + .defaultValue(DELETE_SNAPSHOTS_NONE.getValue()) + .required(true) + .build(); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(DELETE_SNAPSHOTS_OPTION); + return properties; + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -52,8 +80,9 @@ public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor { } final long startNanos = System.nanoTime(); - String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); - String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + final String deleteSnapshotOptions = context.getProperty(DELETE_SNAPSHOTS_OPTION).getValue(); try { CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); @@ -62,12 +91,12 @@ public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor { final OperationContext operationContext = new OperationContext(); AzureStorageUtils.setProxy(operationContext, context); - blob.deleteIfExists(null, null, null, operationContext); + blob.deleteIfExists(DeleteSnapshotsOption.valueOf(deleteSnapshotOptions), null, null, operationContext); session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis); - }catch ( StorageException | URISyntaxException e) { + } catch ( StorageException | URISyntaxException e) { getLogger().error("Failed to delete the specified blob {} from Azure Storage. Routing to failure", new Object[]{blobPath}, e); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE);
