This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b11373af7b NIFI-11549 Added AzureQueueStorage_v12 Processors
b11373af7b is described below

commit b11373af7b017f83c3b579c6aefc7cb7a1816bfd
Author: Emilio Setiadarma <[email protected]>
AuthorDate: Tue May 16 16:50:02 2023 -0700

    NIFI-11549 Added AzureQueueStorage_v12 Processors
    
    - Deprecated GetAzureQueueStorage and PutAzureQueueStorage
    
    This closes #7269
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-azure-processors/pom.xml                  |   4 +
 .../queue/AbstractAzureQueueStorage_v12.java       | 190 +++++++++++++++++++
 .../azure/storage/queue/GetAzureQueueStorage.java  |   2 +
 .../storage/queue/GetAzureQueueStorage_v12.java    | 210 +++++++++++++++++++++
 .../azure/storage/queue/PutAzureQueueStorage.java  |   2 +
 .../storage/queue/PutAzureQueueStorage_v12.java    | 162 ++++++++++++++++
 ...ureStorageCredentialsControllerService_v12.java |   6 +-
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../queue/AbstractTestAzureQueueStorage_v12.java   |  36 ++++
 .../queue/TestGetAzureQueueStorage_v12.java        |  79 ++++++++
 .../queue/TestPutAzureQueueStorage_v12.java        | 100 ++++++++++
 11 files changed, 790 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 87888cac8c..c42c0c0e23 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -92,6 +92,10 @@
             <groupId>com.azure</groupId>
             <artifactId>azure-security-keyvault-keys</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-queue</artifactId>
+        </dependency>
         <!-- Legacy Microsoft Azure Libraries -->
         <dependency>
             <groupId>com.microsoft.azure</groupId>
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java
new file mode 100644
index 0000000000..ce1ad8f537
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.azure.core.credential.AzureSasCredential;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.queue.QueueClient;
+import com.azure.storage.queue.QueueClientBuilder;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.AzureServiceEndpoints;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
+import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
+import reactor.core.publisher.Mono;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractAzureQueueStorage_v12 extends AbstractProcessor {
+    public static final PropertyDescriptor QUEUE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Queue Name")
+            .displayName("Queue Name")
+            .description("Name of the Azure Storage Queue")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor ENDPOINT_SUFFIX = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AzureStorageUtils.ENDPOINT_SUFFIX)
+            .displayName("Endpoint Suffix")
+            .description("Storage accounts in public Azure always use a common 
FQDN suffix. " +
+                    "Override this endpoint suffix with a different suffix in 
certain circumstances (like Azure Stack or non-public Azure regions).")
+            .required(true)
+            .defaultValue(AzureServiceEndpoints.DEFAULT_QUEUE_ENDPOINT_SUFFIX)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Credentials Service")
+            .displayName("Credentials Service")
+            .description("Controller Service used to obtain Azure Storage 
Credentials.")
+            
.identifiesControllerService(AzureStorageCredentialsService_v12.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor REQUEST_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Request Timeout")
+            .displayName("Request Timeout")
+            .description("The timeout for read or write requests to Azure 
Queue Storage. " +
+                    "Defaults to 1 second.")
+            .required(true)
+            .defaultValue("10 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All successfully processed FlowFiles are routed to 
this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Unsuccessful operations will be transferred to the 
failure relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, 
REL_FAILURE)));
+
+    static final String URI_ATTRIBUTE = "azure.queue.uri";
+    static final String INSERTION_TIME_ATTRIBUTE = "azure.queue.insertionTime";
+    static final String EXPIRATION_TIME_ATTRIBUTE = 
"azure.queue.expirationTime";
+    static final String MESSAGE_ID_ATTRIBUTE = "azure.queue.messageId";
+    static final String POP_RECEIPT_ATTRIBUTE = "azure.queue.popReceipt";
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+        final int requestTimeout = 
validationContext.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+        if (requestTimeout <= 0 || requestTimeout > 30) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(REQUEST_TIMEOUT.getDisplayName())
+                    .explanation(REQUEST_TIMEOUT.getDisplayName() + " should 
be greater than 0 secs " +
+                            "and less than or equal to 30 secs")
+                    .build());
+        }
+
+        AzureStorageUtils.validateProxySpec(validationContext, results);
+
+        return results;
+    }
+
+    protected final QueueClient createQueueClient(final ProcessContext 
context, final FlowFile flowFile) {
+        final QueueClientBuilder clientBuilder = new QueueClientBuilder();
+
+        final AzureStorageCredentialsService_v12 storageCredentialsService = 
context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
+        final Map<String, String> attributes = flowFile == null ? 
Collections.emptyMap() : flowFile.getAttributes();
+        final AzureStorageCredentialsDetails_v12 storageCredentialsDetails = 
storageCredentialsService.getCredentialsDetails(attributes);
+        processCredentials(clientBuilder, storageCredentialsDetails);
+        processProxyOptions(clientBuilder, context);
+
+        final String endpointSuffix = 
context.getProperty(ENDPOINT_SUFFIX).getValue();
+        clientBuilder.endpoint(String.format("https://%s.%s";, 
storageCredentialsDetails.getAccountName(), endpointSuffix));
+
+        final String queueName = 
context.getProperty(QUEUE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        clientBuilder.queueName(queueName);
+        return clientBuilder.buildClient();
+    }
+
+    private void processCredentials(final QueueClientBuilder clientBuilder, 
final AzureStorageCredentialsDetails_v12 storageCredentialsDetails) {
+        switch (storageCredentialsDetails.getCredentialsType()) {
+            case ACCOUNT_KEY:
+                clientBuilder.credential(new 
StorageSharedKeyCredential(storageCredentialsDetails.getAccountName(), 
storageCredentialsDetails.getAccountKey()));
+                break;
+            case SAS_TOKEN:
+                clientBuilder.credential(new 
AzureSasCredential(storageCredentialsDetails.getSasToken()));
+                break;
+            case MANAGED_IDENTITY:
+                clientBuilder.credential(new ManagedIdentityCredentialBuilder()
+                        
.clientId(storageCredentialsDetails.getManagedIdentityClientId())
+                        .build());
+                break;
+            case SERVICE_PRINCIPAL:
+                clientBuilder.credential(new ClientSecretCredentialBuilder()
+                        
.tenantId(storageCredentialsDetails.getServicePrincipalTenantId())
+                        
.clientId(storageCredentialsDetails.getServicePrincipalClientId())
+                        
.clientSecret(storageCredentialsDetails.getServicePrincipalClientSecret())
+                        .build());
+                break;
+            case ACCESS_TOKEN:
+                TokenCredential credential = tokenRequestContext -> 
Mono.just(storageCredentialsDetails.getAccessToken());
+                clientBuilder.credential(credential);
+                break;
+            default:
+                throw new IllegalArgumentException("Unhandled credentials 
type: " + storageCredentialsDetails.getCredentialsType());
+        }
+    }
+
+    private void processProxyOptions(final QueueClientBuilder clientBuilder,
+                                     final PropertyContext propertyContext) {
+        final ProxyOptions proxyOptions = 
AzureStorageUtils.getProxyOptions(propertyContext);
+        final NettyAsyncHttpClientBuilder nettyClientBuilder = new 
NettyAsyncHttpClientBuilder();
+        nettyClientBuilder.proxy(proxyOptions);
+
+        final HttpClient nettyClient = nettyClientBuilder.build();
+        clientBuilder.httpClient(nettyClient);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
index dc5a138574..6e29905b5d 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
@@ -26,6 +26,7 @@ import 
org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -63,6 +64,7 @@ import java.util.concurrent.TimeUnit;
         @WritesAttribute(attribute = "azure.queue.messageId", description = 
"The ID of the retrieved message"),
         @WritesAttribute(attribute = "azure.queue.popReceipt", description = 
"The pop receipt of the retrieved message"),
 })
+@DeprecationNotice(alternatives = GetAzureQueueStorage_v12.class)
 public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
 
     public static final PropertyDescriptor AUTO_DELETE = new 
PropertyDescriptor.Builder()
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage_v12.java
new file mode 100644
index 0000000000..28ba1e1b4e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage_v12.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.azure.core.util.Context;
+import com.azure.storage.queue.QueueClient;
+import com.azure.storage.queue.models.QueueMessageItem;
+import com.azure.storage.queue.models.QueueStorageException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.EXPIRATION_TIME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.INSERTION_TIME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.MESSAGE_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.POP_RECEIPT_ATTRIBUTE;
+import static 
org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.URI_ATTRIBUTE;
+
+@SeeAlso({PutAzureQueueStorage.class})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
+@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. 
The retrieved messages will be deleted from the queue by default. If the 
requirement is " +
+        "to consume messages without deleting them, set 'Auto Delete Messages' 
to 'false'. Note: There might be chances of receiving duplicates in situations 
like " +
+        "when a message is received but was unable to be deleted from the 
queue due to some unexpected situations.")
+@WritesAttributes({
+        @WritesAttribute(attribute = URI_ATTRIBUTE, description = "The 
absolute URI of the configured Azure Queue Storage"),
+        @WritesAttribute(attribute = INSERTION_TIME_ATTRIBUTE, description = 
"The time when the message was inserted into the queue storage"),
+        @WritesAttribute(attribute = EXPIRATION_TIME_ATTRIBUTE, description = 
"The time when the message will expire from the queue storage"),
+        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = "The 
ID of the retrieved message"),
+        @WritesAttribute(attribute = POP_RECEIPT_ATTRIBUTE, description = "The 
pop receipt of the retrieved message"),
+})
+public class GetAzureQueueStorage_v12 extends AbstractAzureQueueStorage_v12 {
+    public static final PropertyDescriptor AUTO_DELETE = new 
PropertyDescriptor.Builder()
+            .name("Auto Delete Messages")
+            .displayName("Auto Delete Messages")
+            .description("Specifies whether the received message is to be 
automatically deleted from the queue.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Message Batch Size")
+            .displayName("Message Batch Size")
+            .description("The number of messages to be retrieved from the 
queue.")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 32, true))
+            .defaultValue("32")
+            .build();
+
+    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Visibility Timeout")
+            .displayName("Visibility Timeout")
+            .description("The duration during which the retrieved message 
should be invisible to other consumers.")
+            .required(true)
+            .defaultValue("30 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, 
ProxySpec.SOCKS};
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(
+            Arrays.asList(
+                    QUEUE_NAME,
+                    ENDPOINT_SUFFIX,
+                    STORAGE_CREDENTIALS_SERVICE,
+                    AUTO_DELETE,
+                    MESSAGE_BATCH_SIZE,
+                    VISIBILITY_TIMEOUT,
+                    REQUEST_TIMEOUT,
+                    
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS)
+            )
+    );
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
+    // 7 days is the maximum timeout as per 
https://learn.microsoft.com/en-us/rest/api/storageservices/get-messages
+    private static final Duration MAX_VISIBILITY_TIMEOUT = Duration.ofDays(7);
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = (List<ValidationResult>) 
super.customValidate(validationContext);
+
+        final Duration visibilityTimeout = Duration.ofSeconds(
+                
validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS)
+        );
+
+        if (visibilityTimeout.getSeconds() <= 0) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(VISIBILITY_TIMEOUT.getDisplayName())
+                    .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " 
should be greater than 0 secs")
+                    .build());
+        }
+
+        if (MAX_VISIBILITY_TIMEOUT.compareTo(visibilityTimeout) < 0) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(VISIBILITY_TIMEOUT.getDisplayName())
+                    .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " 
should not be greater than 7 days")
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final int batchSize = 
context.getProperty(MESSAGE_BATCH_SIZE).asInteger();
+        final int visibilityTimeoutInSecs = 
context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final int requestTimeoutInSecs = 
context.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final boolean autoDelete = 
context.getProperty(AUTO_DELETE).asBoolean();
+
+        final QueueClient queueClient = createQueueClient(context, null);
+        final Iterable<QueueMessageItem> retrievedMessagesIterable;
+        try {
+            retrievedMessagesIterable = queueClient.receiveMessages(
+                    batchSize,
+                    Duration.ofSeconds(visibilityTimeoutInSecs),
+                    Duration.ofSeconds(requestTimeoutInSecs),
+                    Context.NONE);
+        } catch (final QueueStorageException e) {
+            getLogger().error("Failed to retrieve messages from Azure Storage 
Queue", e);
+            context.yield();
+            return;
+        }
+
+        final Map<String, String> messagesToDelete = new LinkedHashMap<>();
+
+        for (final QueueMessageItem message : retrievedMessagesIterable) {
+            FlowFile flowFile = session.create();
+
+            final Map<String, String> attributes = new LinkedHashMap<>();
+            attributes.put("azure.queue.uri", queueClient.getQueueUrl());
+            attributes.put("azure.queue.insertionTime", 
message.getInsertionTime().toString());
+            attributes.put("azure.queue.expirationTime", 
message.getExpirationTime().toString());
+            attributes.put("azure.queue.messageId", message.getMessageId());
+            attributes.put("azure.queue.popReceipt", message.getPopReceipt());
+
+            if (autoDelete) {
+                messagesToDelete.put(message.getMessageId(), 
message.getPopReceipt());
+            }
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            flowFile = session.write(flowFile, out -> 
out.write(message.getBody().toString().getBytes()));
+
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowFile, 
queueClient.getQueueUrl().toString());
+        }
+
+        if (autoDelete) {
+            session.commitAsync(() -> {
+                for (final Map.Entry<String, String> entry : 
messagesToDelete.entrySet()) {
+                    final String messageId = entry.getKey();
+                    final String popReceipt = entry.getValue();
+                    queueClient.deleteMessage(messageId, popReceipt);
+                }
+            });
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
index d1e0c35f8c..10da98016f 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
@@ -33,6 +33,7 @@ import com.microsoft.azure.storage.queue.CloudQueueMessage;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -49,6 +50,7 @@ import 
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
 @CapabilityDescription("Writes the content of the incoming FlowFiles to the 
configured Azure Queue Storage.")
+@DeprecationNotice(alternatives = PutAzureQueueStorage_v12.class)
 public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
 
     public static final PropertyDescriptor TTL = new 
PropertyDescriptor.Builder()
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage_v12.java
new file mode 100644
index 0000000000..e6389504c5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage_v12.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import com.azure.core.util.Context;
+import com.azure.storage.queue.QueueClient;
+import com.azure.storage.queue.models.QueueStorageException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+import java.io.ByteArrayOutputStream;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@SeeAlso({GetAzureQueueStorage_v12.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
+@CapabilityDescription("Writes the content of the incoming FlowFiles to the 
configured Azure Queue Storage.")
+public class PutAzureQueueStorage_v12 extends AbstractAzureQueueStorage_v12 {
+    public static final PropertyDescriptor MESSAGE_TIME_TO_LIVE = new 
PropertyDescriptor.Builder()
+            .name("Message Time To Live")
+            .displayName("Message Time To Live")
+            .description("Maximum time to allow the message to be in the 
queue")
+            .required(true)
+            .defaultValue("7 days")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Visibility Timeout")
+            .displayName("Visibility Timeout")
+            .description("The length of time during which the message will be 
invisible after it is read. " +
+                    "If the processing unit fails to delete the message after 
it is read, then the message will reappear in the queue.")
+            .required(true)
+            .defaultValue("30 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, 
ProxySpec.SOCKS};
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(
+            Arrays.asList(
+                    QUEUE_NAME,
+                    ENDPOINT_SUFFIX,
+                    STORAGE_CREDENTIALS_SERVICE,
+                    MESSAGE_TIME_TO_LIVE,
+                    VISIBILITY_TIMEOUT,
+                    REQUEST_TIMEOUT,
+                    
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS)
+            )
+    );
+
+    // 7 days is the maximum timeout as per 
https://learn.microsoft.com/en-us/rest/api/storageservices/get-messages
+    private static final Duration MAX_VISIBILITY_TIMEOUT = Duration.ofDays(7);
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = (List<ValidationResult>) 
super.customValidate(validationContext);
+        final Duration visibilityTimeout = Duration.ofSeconds(
+                
validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS)
+        );
+
+        if (visibilityTimeout.getSeconds() <= 0) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(VISIBILITY_TIMEOUT.getDisplayName())
+                    .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " 
should be greater than 0 secs")
+                    .build());
+        }
+
+        if (MAX_VISIBILITY_TIMEOUT.compareTo(visibilityTimeout) < 0) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(VISIBILITY_TIMEOUT.getDisplayName())
+                    .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " 
should not be greater than 7 days")
+                    .build());
+        }
+
+        final int ttl = 
validationContext.getProperty(MESSAGE_TIME_TO_LIVE).asTimePeriod(TimeUnit.SECONDS).intValue();
+        if (ttl <= 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject(MESSAGE_TIME_TO_LIVE.getDisplayName())
+                    .valid(false)
+                    .explanation(MESSAGE_TIME_TO_LIVE.getDisplayName() + " 
should be any positive number")
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        session.exportTo(flowFile, baos);
+        final String flowFileContent = baos.toString();
+
+        final int visibilityTimeoutInSecs = 
context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final int ttl = 
context.getProperty(MESSAGE_TIME_TO_LIVE).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final int requestTimeoutInSecs = 
context.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+
+        final QueueClient queueClient = createQueueClient(context, flowFile);
+        try {
+            queueClient.sendMessageWithResponse(
+                    flowFileContent,
+                    Duration.ofSeconds(visibilityTimeoutInSecs),
+                    Duration.ofSeconds(ttl),
+                    Duration.ofSeconds(requestTimeoutInSecs),
+                    Context.NONE
+            );
+        } catch (final QueueStorageException e) {
+            getLogger().error("Failed to write message to Azure Queue 
Storage", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        session.getProvenanceReporter().send(flowFile, 
queueClient.getQueueUrl().toString(), transmissionMillis);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
index 696f86d27f..ff63b1538f 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
@@ -32,12 +32,12 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Provides credentials details for Azure Blob processors
+ * Provides credentials details for Azure Storage processors
  *
  * @see AbstractControllerService
  */
-@Tags({"azure", "microsoft", "cloud", "storage", "blob", "credentials"})
-@CapabilityDescription("Provides credentials for Azure Blob processors using 
Azure Blob Storage client library v12.")
+@Tags({"azure", "microsoft", "cloud", "storage", "blob", "credentials", 
"queue"})
+@CapabilityDescription("Provides credentials for Azure Storage processors 
using Azure Storage client library v12.")
 public class AzureStorageCredentialsControllerService_v12 extends 
AbstractControllerService implements AzureStorageCredentialsService_v12 {
 
     public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder()
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index f9e86a4add..62c618c7c0 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -27,3 +27,5 @@ 
org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12
 org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12
 org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12
 org.apache.nifi.processors.azure.storage.MoveAzureDataLakeStorage
+org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage_v12
+org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage_v12
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractTestAzureQueueStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractTestAzureQueueStorage_v12.java
new file mode 100644
index 0000000000..6d5d663d9c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractTestAzureQueueStorage_v12.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import org.apache.nifi.reporting.InitializationException;
+import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12;
+import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
+import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType;
+import org.apache.nifi.util.TestRunner;
+
+public abstract class AbstractTestAzureQueueStorage_v12 {
+    public static final String CREDENTIALS_SERVICE_IDENTIFIER = 
"credentials-service";
+    protected TestRunner runner;
+    protected AzureStorageCredentialsService_v12 credentialsService = new 
AzureStorageCredentialsControllerService_v12();
+
+    protected void setupStorageCredentialsService() throws 
InitializationException {
+        runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, 
credentialsService);
+        runner.setProperty(credentialsService, 
AzureStorageCredentialsControllerService_v12.ACCOUNT_NAME, "account-name");
+        runner.setProperty(credentialsService, 
AzureStorageCredentialsControllerService_v12.CREDENTIALS_TYPE, 
AzureStorageCredentialsType.ACCOUNT_KEY.getAllowableValue());
+        runner.setProperty(credentialsService, 
AzureStorageCredentialsControllerService_v12.ACCOUNT_KEY, "account-key");
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage_v12.java
new file mode 100644
index 0000000000..3f91929426
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage_v12.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestGetAzureQueueStorage_v12 extends 
AbstractTestAzureQueueStorage_v12 {
+    @BeforeEach
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(GetAzureQueueStorage_v12.class);
+        setupStorageCredentialsService();
+        runner.enableControllerService(credentialsService);
+        
runner.setProperty(GetAzureQueueStorage_v12.STORAGE_CREDENTIALS_SERVICE, 
CREDENTIALS_SERVICE_IDENTIFIER);
+        runner.setProperty(GetAzureQueueStorage_v12.QUEUE_NAME, "queue");
+        runner.setProperty(GetAzureQueueStorage_v12.MESSAGE_BATCH_SIZE, "10");
+    }
+
+    @Test
+    public void testValidVisibilityTimeout() {
+        runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 
secs");
+
+        runner.assertValid();
+    }
+
+    @Test
+    public void testInvalidVisibilityTimeoutZeroSecs() {
+        runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "0 
secs");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testInvalidVisibilityTimeoutMoreThanSevenDays() {
+        runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "8 
days");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testValidRequestTimeout() {
+        runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 
secs");
+        runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "15 
secs");
+
+        runner.assertValid();
+    }
+
+    @Test
+    public void testInvalidRequestTimeoutZeroSecs() {
+        runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 
secs");
+        runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "0 secs");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testInvalidRequestTimeoutMoreThanThirtySecs() {
+        runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 
secs");
+        runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "31 
secs");
+
+        runner.assertNotValid();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage_v12.java
new file mode 100644
index 0000000000..ce0afbeddf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage_v12.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.queue;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestPutAzureQueueStorage_v12 extends 
AbstractTestAzureQueueStorage_v12 {
+    @BeforeEach
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(PutAzureQueueStorage_v12.class);
+        setupStorageCredentialsService();
+        runner.enableControllerService(credentialsService);
+        
runner.setProperty(PutAzureQueueStorage_v12.STORAGE_CREDENTIALS_SERVICE, 
CREDENTIALS_SERVICE_IDENTIFIER);
+        runner.setProperty(PutAzureQueueStorage_v12.QUEUE_NAME, "queue");
+    }
+
+    @Test
+    public void testValidVisibilityTimeout() {
+        runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 
days");
+        runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 
secs");
+
+        runner.assertValid();
+    }
+
+    @Test
+    public void testInvalidVisibilityTimeoutZeroSecs() {
+        runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 
days");
+        runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "0 
secs");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testInvalidVisibilityTimeoutMoreThanSevenDays() {
+        runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 
days");
+        runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "8 
days");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testValidTTL() {
+        runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 
days");
+        runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 
secs");
+
+        runner.assertValid();
+    }
+
+    @Test
+    public void testInvalidTTLZeroSecs() {
+        runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "0 
secs");
+        runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 
secs");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testValidRequestTimeout() {
+        runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 
days");
+        runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 
secs");
+        runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "15 
secs");
+
+        runner.assertValid();
+    }
+
+    @Test
+    public void testInvalidRequestTimeoutZeroSecs() {
+        runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 
days");
+        runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 
secs");
+        runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "0 secs");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testInvalidRequestTimeoutMoreThanThirtySecs() {
+        runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 
days");
+        runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 
secs");
+        runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "31 
secs");
+
+        runner.assertNotValid();
+    }
+}

Reply via email to