Repository: nifi
Updated Branches:
  refs/heads/master c118e9623 -> 72f8999b1


NIFI-5015: Implemented Azure Queue Storage processors

Signed-off-by: Pierre Villard <[email protected]>

This closes #2611.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72f8999b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72f8999b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72f8999b

Branch: refs/heads/master
Commit: 72f8999b1518d7a6e8e09ba29ffb9b883c9b09d4
Parents: c118e96
Author: zenfenan <[email protected]>
Authored: Mon Apr 2 07:48:37 2018 +0530
Committer: Pierre Villard <[email protected]>
Committed: Wed Apr 25 20:07:15 2018 +0200

----------------------------------------------------------------------
 .../queue/AbstractAzureQueueStorage.java        | 110 ++++++++++
 .../storage/queue/GetAzureQueueStorage.java     | 205 +++++++++++++++++++
 .../storage/queue/PutAzureQueueStorage.java     | 152 ++++++++++++++
 .../org.apache.nifi.processor.Processor         |   4 +-
 .../processors/azure/storage/AzureTestUtil.java |  43 +++-
 .../storage/queue/GetAzureQueueStorageIT.java   | 149 ++++++++++++++
 .../storage/queue/PutAzureQueueStorageIT.java   | 118 +++++++++++
 .../storage/queue/TestGetAzureQueueStorage.java |  72 +++++++
 .../storage/queue/TestPutAzureQueueStorage.java |  79 +++++++
 9 files changed, 924 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
new file mode 100644
index 0000000..caab936
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
+
+    public static final PropertyDescriptor QUEUE = new 
PropertyDescriptor.Builder()
+            .name("storage-queue-name")
+            .displayName("Queue Name")
+            .description("Name of the Azure Storage Queue")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All successfully processed FlowFiles are routed to 
this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Unsuccessful operations will be transferred to the 
failure relationship.")
+            .build();
+
+    private static final String FORMAT_QUEUE_CONNECTION_STRING = 
"DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+    private static final String FORMAT_QUEUE_BASE_URI = 
"https://%s.queue.core.windows.net";;
+
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, 
REL_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    protected final CloudQueueClient createCloudQueueClient(final 
ProcessContext context, final FlowFile flowFile) {
+        final String storageAccountName;
+        final String storageAccountKey;
+        final String sasToken;
+        final String connectionString;
+
+        if (flowFile == null) {
+            storageAccountName = 
context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+            storageAccountKey = 
context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+            sasToken = 
context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
+        } else {
+            storageAccountName = 
context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            storageAccountKey = 
context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            sasToken = 
context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
+        }
+
+        CloudQueueClient cloudQueueClient;
+        try {
+            if (StringUtils.isNoneBlank(sasToken)) {
+                connectionString = String.format(FORMAT_QUEUE_BASE_URI, 
storageAccountName);
+                StorageCredentials storageCredentials = new 
StorageCredentialsSharedAccessSignature(sasToken);
+                cloudQueueClient = new CloudQueueClient(new 
URI(connectionString), storageCredentials);
+            } else {
+                connectionString = 
String.format(FORMAT_QUEUE_CONNECTION_STRING, storageAccountName, 
storageAccountKey);
+                CloudStorageAccount storageAccount = 
CloudStorageAccount.parse(connectionString);
+                cloudQueueClient = storageAccount.createCloudQueueClient();
+            }
+        } catch (IllegalArgumentException | URISyntaxException e) {
+            getLogger().error("Invalid connection string URI for '{}'", new 
Object[]{context.getName()}, e);
+            throw new IllegalArgumentException(e);
+        } catch (InvalidKeyException e) {
+            getLogger().error("Invalid connection credentials for '{}'", new 
Object[]{context.getName()}, e);
+            throw new IllegalArgumentException(e);
+        }
+        return cloudQueueClient;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
new file mode 100644
index 0000000..d6e510e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. 
The retrieved messages will be deleted from the queue by default. If the 
requirement is " +
+        "to consume messages without deleting them, set 'Auto Delete Messages' 
to 'false'. Note: There might be chances of receiving duplicates in situations 
like " +
+        "when a message is received but was unable to be deleted from the 
queue due to some unexpected situations.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "azure.queue.uri", description = "The 
absolute URI of the configured Azure Queue Storage"),
+        @WritesAttribute(attribute = "azure.queue.insertionTime", description 
= "The time when the message was inserted into the queue storage"),
+        @WritesAttribute(attribute = "azure.queue.expirationTime", description 
= "The time when the message will expire from the queue storage"),
+        @WritesAttribute(attribute = "azure.queue.messageId", description = 
"The ID of the retrieved message"),
+        @WritesAttribute(attribute = "azure.queue.popReceipt", description = 
"The pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
+
+    public static final PropertyDescriptor AUTO_DELETE = new 
PropertyDescriptor.Builder()
+            .name("auto-delete-messages")
+            .displayName("Auto Delete Messages")
+            .description("Specifies whether the received message is to be 
automatically deleted from the queue.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("batch-size")
+            .displayName("Batch Size")
+            .description("The number of messages to be retrieved from the 
queue.")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 32, true))
+            .defaultValue("32")
+            .build();
+
+    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("visibility-timeout")
+            .displayName("Visibility Timeout")
+            .description("The duration during which the retrieved message 
should be invisible to other consumers.")
+            .required(true)
+            .defaultValue("30 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(Arrays.asList(
+            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, 
AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
+            BATCH_SIZE, VISIBILITY_TIMEOUT));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+        final int visibilityTimeoutInSecs = 
context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final boolean autoDelete = 
context.getProperty(AUTO_DELETE).asBoolean();
+        final String queue = 
context.getProperty(QUEUE).evaluateAttributeExpressions().getValue().toLowerCase();
+
+        final Iterable<CloudQueueMessage> retrievedMessagesIterable;
+
+        CloudQueueClient cloudQueueClient;
+        CloudQueue cloudQueue;
+
+        try {
+            cloudQueueClient = createCloudQueueClient(context, null);
+            cloudQueue = cloudQueueClient.getQueueReference(queue);
+            retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, 
visibilityTimeoutInSecs, null, null);
+        } catch (URISyntaxException | StorageException e) {
+            getLogger().error("Failed to retrieve messages from the provided 
Azure Storage Queue due to {}", new Object[] {e});
+            context.yield();
+            return;
+        }
+
+        final List<CloudQueueMessage> cloudQueueMessages = 
toList(retrievedMessagesIterable);
+
+        for (final CloudQueueMessage message : cloudQueueMessages) {
+            FlowFile flowFile = session.create();
+
+            final Map<String, String> attributes = new HashMap<>();
+
+            attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
+            attributes.put("azure.queue.insertionTime", 
message.getInsertionTime().toString());
+            attributes.put("azure.queue.expirationTime", 
message.getExpirationTime().toString());
+            attributes.put("azure.queue.messageId", message.getMessageId());
+            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            flowFile = session.write(flowFile, out -> {
+                try {
+                    out.write(message.getMessageContentAsByte());
+                } catch (StorageException e) {
+                    getLogger().error("Failed to write the retrieved queue 
message to FlowFile content due to {}", new Object[] {e});
+                    context.yield();
+                }
+            });
+
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowFile, 
cloudQueue.getStorageUri().toString());
+        }
+
+        if(autoDelete) {
+            session.commit();
+
+            for (final CloudQueueMessage message : cloudQueueMessages) {
+                try {
+                    cloudQueue.deleteMessage(message);
+                } catch (StorageException e) {
+                    getLogger().error("Failed to delete the retrieved message 
with the id {} from the queue due to {}",
+                            new Object[] {message.getMessageId(), e});
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public Collection<ValidationResult> customValidate(final ValidationContext 
validationContext) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
+        final int visibilityTimeout = 
validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+        if (visibilityTimeout <= 0) {
+            problems.add(new ValidationResult.Builder()
+                                             .valid(false)
+                                             
.subject(VISIBILITY_TIMEOUT.getDisplayName())
+                                             
.explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should be greater than 0 
secs")
+                                             .build());
+        }
+
+        return problems;
+    }
+
+    private List<CloudQueueMessage> toList(Iterable<CloudQueueMessage> 
iterable) {
+        if (iterable instanceof List) {
+            return (List<CloudQueueMessage>) iterable;
+        }
+
+        final ArrayList<CloudQueueMessage> list = new ArrayList<>();
+        if (iterable != null) {
+            for(CloudQueueMessage message : iterable) {
+                list.add(message);
+            }
+        }
+
+        return list;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
new file mode 100644
index 0000000..c289a74
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({GetAzureQueueStorage.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
+@CapabilityDescription("Writes the content of the incoming FlowFiles to the 
configured Azure Queue Storage.")
+public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
+
+    public static final PropertyDescriptor TTL = new 
PropertyDescriptor.Builder()
+            .name("time-to-live")
+            .displayName("TTL")
+            .description("Maximum time to allow the message to be in the 
queue. If left empty, the default value of 7 days will be used.")
+            .required(false)
+            .defaultValue("7 days")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor VISIBILITY_DELAY = new 
PropertyDescriptor.Builder()
+            .name("visibility-delay")
+            .displayName("Visibility Delay")
+            .description("The length of time during which the message will be 
invisible, starting when it is added to the queue. " +
+                         "This value must be greater than or equal to 0 and 
less than the TTL value.")
+            .required(false)
+            .defaultValue("0 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> properties =  
Collections.unmodifiableList(Arrays.asList(
+            AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, 
AzureStorageUtils.PROP_SAS_TOKEN, TTL,
+            QUEUE, VISIBILITY_DELAY));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        session.exportTo(flowFile, baos);
+        final String flowFileContent = baos.toString();
+
+        CloudQueueMessage message = new CloudQueueMessage(flowFileContent);
+        CloudQueueClient cloudQueueClient;
+        CloudQueue cloudQueue;
+
+        final int ttl = 
context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final int delay = 
context.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final String queue = 
context.getProperty(QUEUE).evaluateAttributeExpressions(flowFile).getValue().toLowerCase();
+
+        try {
+            cloudQueueClient = createCloudQueueClient(context, flowFile);
+            cloudQueue = cloudQueueClient.getQueueReference(queue);
+            cloudQueue.addMessage(message, ttl, delay, null, null);
+        } catch (URISyntaxException | StorageException e) {
+            getLogger().error("Failed to write the message to Azure Queue 
Storage due to {}", new Object[]{e});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        session.getProvenanceReporter().send(flowFile, 
cloudQueue.getUri().toString(), transmissionMillis);
+    }
+
+    @Override
+    public Collection<ValidationResult> customValidate(final ValidationContext 
validationContext) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final boolean ttlSet = validationContext.getProperty(TTL).isSet();
+        final boolean delaySet = 
validationContext.getProperty(VISIBILITY_DELAY).isSet();
+
+        final int ttl = 
validationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+        if (ttlSet) {
+            final int SEVEN_DAYS_TIMEPERIOD_IN_SECS = 604800; // i.e. 7 * 24 * 
60 * 60
+
+            if (ttl > SEVEN_DAYS_TIMEPERIOD_IN_SECS) {
+                problems.add(new ValidationResult.Builder()
+                                                 .subject(TTL.getDisplayName())
+                                                 .valid(false)
+                                                 
.explanation(TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set 
a value less than 7 days")
+                                                 .build());
+            }
+        }
+
+        if (delaySet) {
+            int delay = 
validationContext.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+            if (delay > ttl || delay < 0) {
+                problems.add(new ValidationResult.Builder()
+                                                 
.subject(VISIBILITY_DELAY.getDisplayName())
+                                                 .valid(false)
+                                                 
.explanation(VISIBILITY_DELAY.getDisplayName() + " should be greater than or 
equal to 0 and less than " + TTL.getDisplayName())
+                                                 .build());
+            }
+        }
+
+        return problems;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index fa34294..61b0df3 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,4 +18,6 @@ org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub
 org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
 org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
 org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
-org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
\ No newline at end of file
+org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
+org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
+org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
index 4396c70..6d3a692 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
@@ -23,9 +23,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.security.InvalidKeyException;
+import java.util.Iterator;
 import java.util.Properties;
 
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
 import org.apache.nifi.util.file.FileUtils;
 
 import com.microsoft.azure.storage.CloudStorageAccount;
@@ -34,11 +37,17 @@ import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 
 public class AzureTestUtil {
-    private static final String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
-    static final String TEST_CONTAINER_NAME_PREFIX = "nifitest";
 
     private static final Properties CONFIG;
-    static final String TEST_BLOB_NAME = "testing";
+
+    private static final String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+    private static final String FORMAT_CONNECTION_STRING = 
"DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+
+    public static final String TEST_BLOB_NAME = "testing";
+    public static final String TEST_STORAGE_QUEUE = "testqueue";
+    public static final String TEST_CONTAINER_NAME_PREFIX = "nifitest";
+
+    public static CloudQueue cloudQueue;
 
     static {
         final FileInputStream fis;
@@ -67,10 +76,30 @@ public class AzureTestUtil {
     }
 
     public static CloudBlobContainer getContainer(String containerName) throws 
InvalidKeyException, URISyntaxException, StorageException {
-        String storageConnectionString = 
String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, 
getAccountName(), getAccountKey());
-        CloudStorageAccount storageAccount = 
CloudStorageAccount.parse(storageConnectionString);
-        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+        CloudBlobClient blobClient = 
getStorageAccount().createCloudBlobClient();
         return blobClient.getContainerReference(containerName);
     }
 
+    public static CloudQueue getQueue(String queueName) throws 
URISyntaxException, InvalidKeyException, StorageException {
+        CloudQueueClient cloudQueueClient = 
getStorageAccount().createCloudQueueClient();
+        cloudQueue = cloudQueueClient.getQueueReference(queueName);
+        return cloudQueue;
+    }
+
+    private static CloudStorageAccount getStorageAccount() throws 
URISyntaxException, InvalidKeyException {
+        String storageConnectionString = 
String.format(FORMAT_CONNECTION_STRING, getAccountName(), getAccountKey());
+        return CloudStorageAccount.parse(storageConnectionString);
+    }
+
+    public static int getQueueCount() throws StorageException {
+        Iterator<CloudQueueMessage> retrievedMessages = 
cloudQueue.retrieveMessages(10, 1, null, null).iterator();
+        int count = 0;
+
+        while (retrievedMessages.hasNext()) {
+            retrievedMessages.next();
+            count++;
+        }
+
+        return count;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
new file mode 100644
index 0000000..1711bbd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import org.apache.nifi.processors.azure.storage.AzureTestUtil;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.List;
+
+public class GetAzureQueueStorageIT {
+
+    private final TestRunner runner = 
TestRunners.newTestRunner(GetAzureQueueStorage.class);
+    private static CloudQueue cloudQueue;
+
+    @BeforeClass
+    public static void setup() throws InvalidKeyException, StorageException, 
URISyntaxException {
+        cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE);
+        cloudQueue.createIfNotExists();
+    }
+
+    @Test
+    public void testGetWithAutoDeleteFalse() throws StorageException, 
InterruptedException {
+        cloudQueue.clear();
+        insertDummyMessages();
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+        runner.setProperty(GetAzureQueueStorage.QUEUE, 
AzureTestUtil.TEST_STORAGE_QUEUE);
+        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
+        runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
+        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
+
+        runner.run(1);
+
+        final List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
+        Assert.assertFalse(mockFlowFiles.isEmpty());
+
+        Thread.sleep(1500);
+        cloudQueue.downloadAttributes();
+        Assert.assertEquals(3, cloudQueue.getApproximateMessageCount());
+    }
+
+    @Test
+    public void testGetWithELAndAutoDeleteTrue() throws StorageException, 
InterruptedException {
+        cloudQueue.clear();
+        insertDummyMessages();
+
+        runner.setValidateExpressionUsage(true);
+
+        runner.setVariable("account.name", AzureTestUtil.getAccountName());
+        runner.setVariable("account.key", AzureTestUtil.getAccountKey());
+        runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE);
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
+        runner.setProperty(GetAzureQueueStorage.QUEUE, "${queue.name}");
+        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
+        runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true");
+        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
+
+        runner.run(1);
+
+        final List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
+        Assert.assertFalse(mockFlowFiles.isEmpty());
+
+        Thread.sleep(1500);
+        cloudQueue.downloadAttributes();
+        Assert.assertEquals(0, cloudQueue.getApproximateMessageCount());
+    }
+
+    @Test
+    public void testGetWithVisibilityTimeout() throws StorageException, 
InterruptedException {
+        cloudQueue.clear();
+        insertDummyMessages();
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+        runner.setProperty(GetAzureQueueStorage.QUEUE, 
AzureTestUtil.TEST_STORAGE_QUEUE);
+        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
+        runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
+        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
+
+        runner.run(1);
+
+        final List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
+        Assert.assertFalse(mockFlowFiles.isEmpty());
+        Assert.assertEquals(0, AzureTestUtil.getQueueCount());
+
+        Thread.sleep(1500);
+        Assert.assertEquals(3, AzureTestUtil.getQueueCount());
+    }
+
+    @Test
+    public void testGetWithBatchSize() throws StorageException {
+        cloudQueue.clear();
+        insertDummyMessages();
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+        runner.setProperty(GetAzureQueueStorage.QUEUE, 
AzureTestUtil.TEST_STORAGE_QUEUE);
+        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "2");
+        runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true");
+        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 
2);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 
3);
+
+    }
+
+    private static void insertDummyMessages() throws StorageException {
+        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 1"), 
604800, 0, null, null);
+        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 2"), 
604800, 0, null, null);
+        cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 3"), 
604800, 0, null, null);
+    }
+
+    @AfterClass
+    public static void cleanup() throws StorageException {
+        cloudQueue.deleteIfExists();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
new file mode 100644
index 0000000..e02f16d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import org.apache.nifi.processors.azure.storage.AzureTestUtil;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class PutAzureQueueStorageIT {
+
+    private final TestRunner runner = 
TestRunners.newTestRunner(PutAzureQueueStorage.class);
+    private static CloudQueue cloudQueue;
+
+    @BeforeClass
+    public static void setup() throws InvalidKeyException, StorageException, 
URISyntaxException {
+        cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE);
+        cloudQueue.createIfNotExists();
+    }
+
+    @Test
+    public void testSimplePut() throws InvalidKeyException, StorageException, 
URISyntaxException {
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+        runner.setProperty(PutAzureQueueStorage.QUEUE, 
AzureTestUtil.TEST_STORAGE_QUEUE);
+
+        runner.enqueue("Dummy message");
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 
1);
+    }
+
+    @Test
+    public void testSimplePutWithEL() throws StorageException, 
URISyntaxException, InvalidKeyException {
+        runner.setValidateExpressionUsage(true);
+
+        runner.setVariable("account.name", AzureTestUtil.getAccountName());
+        runner.setVariable("account.key", AzureTestUtil.getAccountKey());
+        runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE);
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
+        runner.setProperty(PutAzureQueueStorage.QUEUE, "${queue.name}");
+
+        runner.enqueue("Dummy message");
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 
1);
+    }
+
+    @Test
+    public void testPutWithTTL() throws StorageException, InterruptedException 
{
+        cloudQueue.clear();
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+        runner.setProperty(PutAzureQueueStorage.QUEUE, 
AzureTestUtil.TEST_STORAGE_QUEUE);
+        runner.setProperty(PutAzureQueueStorage.TTL, "2 secs");
+
+        runner.enqueue("Dummy message");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 
1);
+        Assert.assertEquals(1, AzureTestUtil.getQueueCount());
+
+        Thread.sleep(2400);
+        Assert.assertEquals(0, AzureTestUtil.getQueueCount());
+    }
+
+    @Test
+    public void testPutWithVisibilityDelay() throws StorageException, 
InterruptedException {
+        cloudQueue.clear();
+
+        cloudQueue.clear();
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+        runner.setProperty(PutAzureQueueStorage.QUEUE, 
AzureTestUtil.TEST_STORAGE_QUEUE);
+        runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "2 secs");
+
+        runner.enqueue("Dummy message");
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 
1);
+        Assert.assertEquals(0, AzureTestUtil.getQueueCount());
+
+        Thread.sleep(2400);
+        Assert.assertEquals(1, AzureTestUtil.getQueueCount());
+    }
+
+    @AfterClass
+    public static void cleanup() throws StorageException {
+        cloudQueue.deleteIfExists();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage.java
new file mode 100644
index 0000000..aba779c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+
+public class TestGetAzureQueueStorage {
+
+    private final TestRunner runner = 
TestRunners.newTestRunner(GetAzureQueueStorage.class);
+
+    @Test
+    public void testValidVisibilityTimeout() {
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
+        runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
+        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
+        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "10 secs");
+
+        ProcessContext processContext = runner.getProcessContext();
+        Collection<ValidationResult> results = new HashSet<>();
+        if (processContext instanceof MockProcessContext) {
+            results = ((MockProcessContext) processContext).validate();
+        }
+
+        Assert.assertEquals(0, results.size());
+    }
+
+    @Test
+    public void testInvalidVisibilityTimeout() {
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
+        runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
+        runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
+        runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "0 secs");
+
+        ProcessContext processContext = runner.getProcessContext();
+        Collection<ValidationResult> results = new HashSet<>();
+        if (processContext instanceof MockProcessContext) {
+            results = ((MockProcessContext) processContext).validate();
+        }
+
+        Assert.assertEquals(1, results.size());
+        Iterator<ValidationResult> iterator = results.iterator();
+        Assert.assertTrue(iterator.next().getExplanation().contains("should be 
greater than 0 secs"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/72f8999b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java
new file mode 100644
index 0000000..d1e552e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.microsoft.azure.storage.StorageException;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+
+public class TestPutAzureQueueStorage {
+
+    private final TestRunner runner = 
TestRunners.newTestRunner(PutAzureQueueStorage.class);
+
+    @Test
+    public void testInvalidTTLAndVisibilityDelay() throws StorageException, 
URISyntaxException, InvalidKeyException {
+
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
+        runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
+        runner.setProperty(PutAzureQueueStorage.TTL, "8 days");
+        runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "9 days");
+
+        ProcessContext processContext = runner.getProcessContext();
+        Collection<ValidationResult> results = new HashSet<>();
+        if (processContext instanceof MockProcessContext) {
+            results = ((MockProcessContext) processContext).validate();
+        }
+
+        Assert.assertEquals(2, results.size());
+
+        Iterator<ValidationResult> iterator = results.iterator();
+        
Assert.assertTrue(iterator.next().toString().contains(PutAzureQueueStorage.TTL.getDisplayName()
 + " exceeds the allowed limit of 7 days. Set a value less than 7 days"));
+        
Assert.assertTrue(iterator.next().toString().contains(PutAzureQueueStorage.VISIBILITY_DELAY.getDisplayName()
 + " should be greater than or equal to 0 and less than"));
+    }
+
+    @Test
+    public void testAllValidProperties() {
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
+        runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
+        runner.setProperty(PutAzureQueueStorage.TTL, "6 days");
+        runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "5 days");
+
+        ProcessContext processContext = runner.getProcessContext();
+        Collection<ValidationResult> results = new HashSet<>();
+        if (processContext instanceof MockProcessContext) {
+            results = ((MockProcessContext) processContext).validate();
+        }
+
+        Assert.assertEquals(0, results.size());
+    }
+
+}

Reply via email to