turcsanyip commented on a change in pull request #4430:
URL: https://github.com/apache/nifi/pull/4430#discussion_r462330562



##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService.java
##########
@@ -77,16 +78,37 @@
         final String accountKey = 
validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).getValue();
         final String sasToken = 
validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).getValue();
 
-        if (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)) {
+        final Boolean accountKeyIsSet   = 
validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).isSet();
+        final Boolean sasTokenIsSet     = 
validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).isSet();
+
+        if (StringUtils.isAllBlank(accountKey, sasToken)) {
             results.add(new 
ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
                     .valid(false)
                     .explanation("either " + 
AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " or " + 
AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName() + " is required")
                     .build());
-        } else if (StringUtils.isNotBlank(accountKey) && 
StringUtils.isNotBlank(sasToken)) {
-            results.add(new 
ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
+        }
+
+        if (BooleanUtils.xor(new Boolean[] { accountKeyIsSet, sasTokenIsSet 
})) {
+            if (accountKeyIsSet) {
+                if (StringUtils.isBlank(accountKey)) {
+                    results.add(new 
ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
                         .valid(false)
-                        .explanation("cannot set both " + 
AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " and " + 
AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName())
+                        
.explanation(AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " must be set 
when using Account Key authentication.")
                         .build());
+                }
+            } else if (sasTokenIsSet) {
+                if (StringUtils.isBlank(sasToken)) {
+                    results.add(new 
ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
+                        .valid(false)
+                        
.explanation(AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName() + " must be set 
when using SAS token authentication.")
+                        .build());
+                }

Review comment:
       I believe these two cases should be handled via 
`StandardValidators.NON_BLANK_VALIDATOR` directly on the properties which would 
be less complicated / more straightforward.
   Nevertheless, it is a good point to check blank values and the current 
`NON_EMPTY_VALIDATOR` is not the best option.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
##########
@@ -147,18 +154,40 @@ private void configureControllerService(String 
accountName, String accountKey, S
         
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, 
CREDENTIALS_SERVICE_VALUE);
     }
 
-    private void 
assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails
 storageCredentialsDetails) {
+    private void assertStorageCredentialsDetailsAccountNameAndAccountKey(
+            AzureStorageCredentialsDetails storageCredentialsDetails) {
+
         assertEquals(ACCOUNT_NAME_VALUE, 
storageCredentialsDetails.getStorageAccountName());
-        assertTrue(storageCredentialsDetails.getStorageCredentials() 
instanceof StorageCredentialsAccountAndKey);
-        StorageCredentialsAccountAndKey storageCredentials = 
(StorageCredentialsAccountAndKey) 
storageCredentialsDetails.getStorageCredentials();
+        assertTrue(storageCredentialsDetails.getCredentialType() == 
AzureStorageCredentialsDetails.CredentialType.STORAGE_ACCOUNT_KEY);
+        assertTrue(storageCredentialsDetails.getStorageSharedKeyCredential() 
instanceof StorageSharedKeyCredential);
+
+        // test credential object
+        StorageSharedKeyCredential storageCredentials = 
(StorageSharedKeyCredential) storageCredentialsDetails

Review comment:
       Unnecessary type cast.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
##########
@@ -176,45 +174,38 @@ protected String getDefaultTimePrecision() {
     protected List<BlobInfo> performListing(final ProcessContext context, 
final Long minTimestamp) throws IOException {
         String containerName = 
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
         String prefix = 
context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
-        if (prefix == null) {
-            prefix = "";
-        }
+
         final List<BlobInfo> listing = new ArrayList<>();
         try {
-            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
-            CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
+            BlobServiceClient blobServiceClient = 
AzureStorageUtils.createBlobServiceClient(context, null);
+            BlobContainerClient blobContainerClient = 
blobServiceClient.getBlobContainerClient(containerName);
 
-            final OperationContext operationContext = new OperationContext();
-            AzureStorageUtils.setProxy(operationContext, context);
+            final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
+                .setPrefix(prefix)
+                .setDetails(new BlobListDetails()
+                    .setRetrieveMetadata(true));
 
-            for (ListBlobItem blob : container.listBlobs(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-                if (blob instanceof CloudBlob) {
-                    CloudBlob cloudBlob = (CloudBlob) blob;
-                    BlobProperties properties = cloudBlob.getProperties();
-                    StorageUri uri = 
cloudBlob.getSnapshotQualifiedStorageUri();
+            blobContainerClient.listBlobs().forEach(blob -> {
+                if (blob instanceof BlobItem) {
+                    BlobItem blobItem = (BlobItem) blob;

Review comment:
       Unnecessary type check / type cast. `listBlobs()` returns 
`PagedIterable<BlobItem>`.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, 
ProxySpec.SOCKS};
+
+    private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final 
Proxy.Type proxyType) {
+        for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+            if (item.toProxyType() == proxyType) {
+                return item;
+            }
+        }
+        return null;
+    }
+
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration
+            .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+    public static HttpClient createHttpClient(final PropertyContext 
propertyContext) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(propertyContext);
+        final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+        final HttpClient client = new NettyAsyncHttpClientBuilder()
+            .proxy(proxyOptions)
+            .build();
+
+        return client;
+    }
+
+    public static void validateProxySpec(final ValidationContext context, 
final Collection<ValidationResult> results) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
+
+        final String proxyServerHost = proxyConfig.getProxyServerHost();
+        final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+        final String proxyServerUser = proxyConfig.getProxyUserName();
+        final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+        if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort == 
null)
+            || (StringUtils.isBlank(proxyServerHost) && proxyServerPort != 
null)) {
+            results.add(new 
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+                    .explanation(
+                            "When specifying address information, both `host` 
and `port` information must be provided.")

Review comment:
       We use "normal" apostrophes (') in UI messages instead of (`).

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -180,21 +198,27 @@ public static AzureStorageCredentialsDetails 
createStorageCredentialsDetails(Pro
         final String accountKey = 
context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
         final String sasToken = 
context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
 
+        AzureStorageCredentialsDetails azureStorageCredentialsDetails;
+
         if (StringUtils.isBlank(accountName)) {
             throw new IllegalArgumentException(String.format("'%s' must not be 
empty.", ACCOUNT_NAME.getDisplayName()));
         }
 
-        StorageCredentials storageCredentials;
+        if (StringUtils.isAllBlank(accountKey, sasToken)) {
+            throw new IllegalArgumentException(String.format("Either '%s' or 
'%s' must be defined.", ACCOUNT_KEY.getDisplayName(),
+                                               
PROP_SAS_TOKEN.getDisplayName()));

Review comment:
       It is checked in the `else` branch below. Seems to be duplicated here.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -180,21 +198,27 @@ public static AzureStorageCredentialsDetails 
createStorageCredentialsDetails(Pro
         final String accountKey = 
context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
         final String sasToken = 
context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
 
+        AzureStorageCredentialsDetails azureStorageCredentialsDetails;
+
         if (StringUtils.isBlank(accountName)) {
             throw new IllegalArgumentException(String.format("'%s' must not be 
empty.", ACCOUNT_NAME.getDisplayName()));
         }
 
-        StorageCredentials storageCredentials;
+        if (StringUtils.isAllBlank(accountKey, sasToken)) {
+            throw new IllegalArgumentException(String.format("Either '%s' or 
'%s' must be defined.", ACCOUNT_KEY.getDisplayName(),
+                                               
PROP_SAS_TOKEN.getDisplayName()));
+        }
 
-        if (StringUtils.isNotBlank(accountKey)) {
-            storageCredentials = new 
StorageCredentialsAccountAndKey(accountName, accountKey);
+        if(StringUtils.isNotBlank(accountKey)) {

Review comment:
       Minor: `if (`
   Please use a formatter (default settings of IntelliJ / Eclipse should be 
fine.

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
##########
@@ -77,8 +76,19 @@
             <version>${azure-eventhubs-eph.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-storage</artifactId>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-blob</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-queue</artifactId>
+            <version>${azure-storage-queue.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>

Review comment:
       Could you please define it in the parent pom's `dependencyManagement` 
section? (similar to the blob dependency)

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -146,24 +142,46 @@ private AzureStorageUtils() {
     }
 
     /**
-     * Create CloudBlobClient instance.
+     * Create BlobServiceClient instance.
      * @param flowFile An incoming FlowFile can be used for NiFi Expression 
Language evaluation to derive
      *                 Account Name, Account Key or SAS Token. This can be 
null if not available.
      */
-    public static CloudBlobClient createCloudBlobClient(ProcessContext 
context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
+    public static BlobServiceClient createBlobServiceClient(PropertyContext 
context, FlowFile flowFile) {
         final AzureStorageCredentialsDetails storageCredentialsDetails = 
getStorageCredentialsDetails(context, flowFile);
-        final CloudStorageAccount cloudStorageAccount = new 
CloudStorageAccount(
-            storageCredentialsDetails.getStorageCredentials(),
-            true,
-            storageCredentialsDetails.getStorageSuffix(),
-            storageCredentialsDetails.getStorageAccountName());
-        final CloudBlobClient cloudBlobClient = 
cloudStorageAccount.createCloudBlobClient();
-
-        return cloudBlobClient;
+
+        final String storageSuffix = 
StringUtils.isNotBlank(storageCredentialsDetails.getStorageSuffix())
+            ? storageCredentialsDetails.getStorageSuffix()
+            : "blob.core.windows.net";
+        final String endpoint = String.format("https://%s.%s";, 
storageCredentialsDetails.getStorageAccountName(),
+                                                               storageSuffix);
+
+        // use HttpClient object to allow proxy setting
+        final HttpClient httpClient = 
AzureProxyUtils.createHttpClient(context);
+        final BlobServiceClientBuilder blobServiceClientBuilder = new 
BlobServiceClientBuilder()
+                                                                      
.endpoint(endpoint)
+                                                                      
.httpClient(httpClient);
+        BlobServiceClient blobServiceClient;
+
+        switch(storageCredentialsDetails.getCredentialType()) {
+            case SAS_TOKEN:
+                blobServiceClient = 
blobServiceClientBuilder.sasToken(storageCredentialsDetails.getSasToken())
+                    .buildClient();
+                break;
+            case STORAGE_ACCOUNT_KEY:
+                blobServiceClient =  
blobServiceClientBuilder.credential(storageCredentialsDetails.getStorageSharedKeyCredential())
+                    .buildClient();
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Invalid 
credential type '%s'!", 
storageCredentialsDetails.getCredentialType().toString()));
+        }
+
+        return blobServiceClient;
     }
 
     public static AzureStorageCredentialsDetails 
getStorageCredentialsDetails(PropertyContext context, FlowFile flowFile) {
-        final Map<String, String> attributes = flowFile != null ? 
flowFile.getAttributes() : Collections.emptyMap();
+        final Map<String, String> attributes = flowFile != null
+            ? flowFile.getAttributes()
+            : Collections.emptyMap();

Review comment:
       Please try to avoid reformatting existing code (supposed it was 
formatted properly).

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -146,24 +142,46 @@ private AzureStorageUtils() {
     }
 
     /**
-     * Create CloudBlobClient instance.
+     * Create BlobServiceClient instance.
      * @param flowFile An incoming FlowFile can be used for NiFi Expression 
Language evaluation to derive
      *                 Account Name, Account Key or SAS Token. This can be 
null if not available.
      */
-    public static CloudBlobClient createCloudBlobClient(ProcessContext 
context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
+    public static BlobServiceClient createBlobServiceClient(PropertyContext 
context, FlowFile flowFile) {
         final AzureStorageCredentialsDetails storageCredentialsDetails = 
getStorageCredentialsDetails(context, flowFile);
-        final CloudStorageAccount cloudStorageAccount = new 
CloudStorageAccount(
-            storageCredentialsDetails.getStorageCredentials(),
-            true,
-            storageCredentialsDetails.getStorageSuffix(),
-            storageCredentialsDetails.getStorageAccountName());
-        final CloudBlobClient cloudBlobClient = 
cloudStorageAccount.createCloudBlobClient();
-
-        return cloudBlobClient;
+
+        final String storageSuffix = 
StringUtils.isNotBlank(storageCredentialsDetails.getStorageSuffix())
+            ? storageCredentialsDetails.getStorageSuffix()
+            : "blob.core.windows.net";
+        final String endpoint = String.format("https://%s.%s";, 
storageCredentialsDetails.getStorageAccountName(),
+                                                               storageSuffix);
+
+        // use HttpClient object to allow proxy setting
+        final HttpClient httpClient = 
AzureProxyUtils.createHttpClient(context);
+        final BlobServiceClientBuilder blobServiceClientBuilder = new 
BlobServiceClientBuilder()
+                                                                      
.endpoint(endpoint)
+                                                                      
.httpClient(httpClient);
+        BlobServiceClient blobServiceClient;
+
+        switch(storageCredentialsDetails.getCredentialType()) {

Review comment:
       Minor: `switch (`

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
##########
@@ -25,13 +25,12 @@
 
     <dependencies>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-storage</artifactId>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-blob</artifactId>

Review comment:
       `azure-storage-common` dependency would be sufficient in the services 
api module.
   Now it is coming as the transitive dependency of `azure-storage-blob` but 
the blob itself is not needed here.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, 
ProxySpec.SOCKS};
+
+    private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final 
Proxy.Type proxyType) {
+        for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+            if (item.toProxyType() == proxyType) {
+                return item;
+            }
+        }
+        return null;
+    }
+
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration
+            .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+    public static HttpClient createHttpClient(final PropertyContext 
propertyContext) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(propertyContext);
+        final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+        final HttpClient client = new NettyAsyncHttpClientBuilder()
+            .proxy(proxyOptions)
+            .build();
+
+        return client;
+    }
+
+    public static void validateProxySpec(final ValidationContext context, 
final Collection<ValidationResult> results) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
+
+        final String proxyServerHost = proxyConfig.getProxyServerHost();
+        final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+        final String proxyServerUser = proxyConfig.getProxyUserName();
+        final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+        if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort == 
null)
+            || (StringUtils.isBlank(proxyServerHost) && proxyServerPort != 
null)) {
+            results.add(new 
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+                    .explanation(
+                            "When specifying address information, both `host` 
and `port` information must be provided.")
+                    .build());
+        }
+
+        if ((StringUtils.isBlank(proxyServerUser) && 
StringUtils.isNotBlank(proxyServerPassword))
+            || (StringUtils.isNotBlank(proxyServerUser) && 
StringUtils.isBlank(proxyServerPassword))) {
+            results.add(new 
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+                    .explanation(
+                        "When specifying credentials, both `user` and 
`password` must be provided.")
+                    .build());
+        }
+
+        ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
+    }
+
+    public static ProxyOptions getProxyOptions(final ProxyConfiguration 
proxyConfig) {
+        final String proxyServerHost = proxyConfig.getProxyServerHost();
+        final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+        final String proxyServerUser = proxyConfig.getProxyUserName();
+        final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+        final Boolean proxyServerProvided = 
StringUtils.isNotBlank(proxyServerHost) && proxyServerPort != null;
+        final Boolean proxyCredentialsProvided = 
StringUtils.isNotBlank(proxyServerUser) && 
StringUtils.isNotBlank(proxyServerPassword);

Review comment:
       Variables can be primitive `boolean`.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
##########
@@ -176,45 +174,38 @@ protected String getDefaultTimePrecision() {
     protected List<BlobInfo> performListing(final ProcessContext context, 
final Long minTimestamp) throws IOException {
         String containerName = 
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
         String prefix = 
context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
-        if (prefix == null) {
-            prefix = "";
-        }
+
         final List<BlobInfo> listing = new ArrayList<>();
         try {
-            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
-            CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
+            BlobServiceClient blobServiceClient = 
AzureStorageUtils.createBlobServiceClient(context, null);
+            BlobContainerClient blobContainerClient = 
blobServiceClient.getBlobContainerClient(containerName);
 
-            final OperationContext operationContext = new OperationContext();
-            AzureStorageUtils.setProxy(operationContext, context);
+            final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
+                .setPrefix(prefix)
+                .setDetails(new BlobListDetails()
+                    .setRetrieveMetadata(true));
 
-            for (ListBlobItem blob : container.listBlobs(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-                if (blob instanceof CloudBlob) {
-                    CloudBlob cloudBlob = (CloudBlob) blob;
-                    BlobProperties properties = cloudBlob.getProperties();
-                    StorageUri uri = 
cloudBlob.getSnapshotQualifiedStorageUri();
+            blobContainerClient.listBlobs().forEach(blob -> {
+                if (blob instanceof BlobItem) {
+                    BlobItem blobItem = (BlobItem) blob;
+                    BlobItemProperties properties = blobItem.getProperties();
+                    BlobClient blobClient = 
blobContainerClient.getBlobClient(blobItem.getName());
+                    String uri = blobClient.getBlobUrl();
 
                     Builder builder = new BlobInfo.Builder()
-                                              
.primaryUri(uri.getPrimaryUri().toString())
-                                              .blobName(cloudBlob.getName())
-                                              .containerName(containerName)
+                                              .primaryUri(uri)
+                                              .blobName(blobItem.getName())
+                                              
.blobType(properties.getBlobType().toString())

Review comment:
       Minor backward incompatibility issue: `Block` (orig) vs `BlockBlob` (new)

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
##########
@@ -176,45 +174,38 @@ protected String getDefaultTimePrecision() {
     protected List<BlobInfo> performListing(final ProcessContext context, 
final Long minTimestamp) throws IOException {
         String containerName = 
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
         String prefix = 
context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
-        if (prefix == null) {
-            prefix = "";
-        }
+
         final List<BlobInfo> listing = new ArrayList<>();
         try {
-            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
-            CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
+            BlobServiceClient blobServiceClient = 
AzureStorageUtils.createBlobServiceClient(context, null);
+            BlobContainerClient blobContainerClient = 
blobServiceClient.getBlobContainerClient(containerName);
 
-            final OperationContext operationContext = new OperationContext();
-            AzureStorageUtils.setProxy(operationContext, context);
+            final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
+                .setPrefix(prefix)
+                .setDetails(new BlobListDetails()
+                    .setRetrieveMetadata(true));
 
-            for (ListBlobItem blob : container.listBlobs(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-                if (blob instanceof CloudBlob) {
-                    CloudBlob cloudBlob = (CloudBlob) blob;
-                    BlobProperties properties = cloudBlob.getProperties();
-                    StorageUri uri = 
cloudBlob.getSnapshotQualifiedStorageUri();
+            blobContainerClient.listBlobs().forEach(blob -> {
+                if (blob instanceof BlobItem) {
+                    BlobItem blobItem = (BlobItem) blob;
+                    BlobItemProperties properties = blobItem.getProperties();
+                    BlobClient blobClient = 
blobContainerClient.getBlobClient(blobItem.getName());
+                    String uri = blobClient.getBlobUrl();
 
                     Builder builder = new BlobInfo.Builder()
-                                              
.primaryUri(uri.getPrimaryUri().toString())
-                                              .blobName(cloudBlob.getName())
-                                              .containerName(containerName)
+                                              .primaryUri(uri)

Review comment:
       The same url encoding issue as in case of the Put processor.
   It also affects a `filename` attribute of the outgoing FlowFile which would 
be calculated as:
   `primaryUri.substring(primaryUri.lastIndexOf('/') + 1);`

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -37,17 +28,22 @@
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.proxy.ProxyConfiguration;
-import org.apache.nifi.proxy.ProxySpec;
 import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
 import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+
 public final class AzureStorageUtils {
+
     public static final String BLOCK = "Block";
     public static final String PAGE = "Page";
+    public static final String APPEND = "Append";

Review comment:
       What is the point in these constants?
   The existing ones are not used either (as far as I can see).

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -146,24 +142,46 @@ private AzureStorageUtils() {
     }
 
     /**
-     * Create CloudBlobClient instance.
+     * Create BlobServiceClient instance.
      * @param flowFile An incoming FlowFile can be used for NiFi Expression 
Language evaluation to derive
      *                 Account Name, Account Key or SAS Token. This can be 
null if not available.
      */
-    public static CloudBlobClient createCloudBlobClient(ProcessContext 
context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
+    public static BlobServiceClient createBlobServiceClient(PropertyContext 
context, FlowFile flowFile) {
         final AzureStorageCredentialsDetails storageCredentialsDetails = 
getStorageCredentialsDetails(context, flowFile);
-        final CloudStorageAccount cloudStorageAccount = new 
CloudStorageAccount(
-            storageCredentialsDetails.getStorageCredentials(),
-            true,
-            storageCredentialsDetails.getStorageSuffix(),
-            storageCredentialsDetails.getStorageAccountName());
-        final CloudBlobClient cloudBlobClient = 
cloudStorageAccount.createCloudBlobClient();
-
-        return cloudBlobClient;
+
+        final String storageSuffix = 
StringUtils.isNotBlank(storageCredentialsDetails.getStorageSuffix())
+            ? storageCredentialsDetails.getStorageSuffix()
+            : "blob.core.windows.net";

Review comment:
       Does it work for Queue Storage as well?
   The "official" suffix would be `queue.core.windows.net` for queues.
   With my storage account I get back different IPs with nslookup but they 
might point to the same place behind the scenes...

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
##########
@@ -147,18 +154,40 @@ private void configureControllerService(String 
accountName, String accountKey, S
         
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, 
CREDENTIALS_SERVICE_VALUE);
     }
 
-    private void 
assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails
 storageCredentialsDetails) {
+    private void assertStorageCredentialsDetailsAccountNameAndAccountKey(
+            AzureStorageCredentialsDetails storageCredentialsDetails) {
+
         assertEquals(ACCOUNT_NAME_VALUE, 
storageCredentialsDetails.getStorageAccountName());
-        assertTrue(storageCredentialsDetails.getStorageCredentials() 
instanceof StorageCredentialsAccountAndKey);
-        StorageCredentialsAccountAndKey storageCredentials = 
(StorageCredentialsAccountAndKey) 
storageCredentialsDetails.getStorageCredentials();
+        assertTrue(storageCredentialsDetails.getCredentialType() == 
AzureStorageCredentialsDetails.CredentialType.STORAGE_ACCOUNT_KEY);
+        assertTrue(storageCredentialsDetails.getStorageSharedKeyCredential() 
instanceof StorageSharedKeyCredential);

Review comment:
       It is always true, the assert has no real effect.
   Also in `assertStorageCredentialsDetailsAccountNameAndSasToken`.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
##########
@@ -48,20 +48,17 @@
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor {
 
-    private static final AllowableValue DELETE_SNAPSHOTS_NONE = new 
AllowableValue(DeleteSnapshotsOption.NONE.name(), "None", "Delete the blob 
only.");
-
-    private static final AllowableValue DELETE_SNAPSHOTS_ALSO = new 
AllowableValue(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS.name(), "Include 
Snapshots", "Delete the blob and its snapshots.");
+    private static final AllowableValue DELETE_SNAPSHOTS_ALSO = new 
AllowableValue(DeleteSnapshotsOptionType.INCLUDE.name(), "Include Snapshots", 
"Delete the blob and its snapshots.");
 
-    private static final AllowableValue DELETE_SNAPSHOTS_ONLY = new 
AllowableValue(DeleteSnapshotsOption.DELETE_SNAPSHOTS_ONLY.name(), "Delete 
Snapshots Only", "Delete only the blob's snapshots.");
+    private static final AllowableValue DELETE_SNAPSHOTS_ONLY = new 
AllowableValue(DeleteSnapshotsOptionType.ONLY.name(), "Delete Snapshots Only", 
"Delete only the blob's snapshots.");

Review comment:
       This is a backward incompatible change in terms of existing flows.
   We need to keep the `NONE` option.
   We also need to keep the original names for `AllowableValue`-s and translate 
them to the new enum values in `onTrigger()`.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
##########
@@ -176,45 +174,38 @@ protected String getDefaultTimePrecision() {
     protected List<BlobInfo> performListing(final ProcessContext context, 
final Long minTimestamp) throws IOException {
         String containerName = 
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
         String prefix = 
context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
-        if (prefix == null) {
-            prefix = "";
-        }
+
         final List<BlobInfo> listing = new ArrayList<>();
         try {
-            CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
-            CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
+            BlobServiceClient blobServiceClient = 
AzureStorageUtils.createBlobServiceClient(context, null);
+            BlobContainerClient blobContainerClient = 
blobServiceClient.getBlobContainerClient(containerName);
 
-            final OperationContext operationContext = new OperationContext();
-            AzureStorageUtils.setProxy(operationContext, context);
+            final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
+                .setPrefix(prefix)
+                .setDetails(new BlobListDetails()
+                    .setRetrieveMetadata(true));
 
-            for (ListBlobItem blob : container.listBlobs(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-                if (blob instanceof CloudBlob) {
-                    CloudBlob cloudBlob = (CloudBlob) blob;
-                    BlobProperties properties = cloudBlob.getProperties();
-                    StorageUri uri = 
cloudBlob.getSnapshotQualifiedStorageUri();
+            blobContainerClient.listBlobs().forEach(blob -> {
+                if (blob instanceof BlobItem) {
+                    BlobItem blobItem = (BlobItem) blob;
+                    BlobItemProperties properties = blobItem.getProperties();
+                    BlobClient blobClient = 
blobContainerClient.getBlobClient(blobItem.getName());
+                    String uri = blobClient.getBlobUrl();
 
                     Builder builder = new BlobInfo.Builder()
-                                              
.primaryUri(uri.getPrimaryUri().toString())
-                                              .blobName(cloudBlob.getName())
-                                              .containerName(containerName)
+                                              .primaryUri(uri)
+                                              .blobName(blobItem.getName())
+                                              
.blobType(properties.getBlobType().toString())
+                                              
.containerName(blobClient.getContainerName())
                                               
.contentType(properties.getContentType())
                                               
.contentLanguage(properties.getContentLanguage())
-                                              .etag(properties.getEtag())
-                                              
.lastModifiedTime(properties.getLastModified().getTime())
-                                              .length(properties.getLength());
-
-                    if (uri.getSecondaryUri() != null) {
-                        builder.secondaryUri(uri.getSecondaryUri().toString());
-                    }
-
-                    if (blob instanceof CloudBlockBlob) {
-                        builder.blobType(AzureStorageUtils.BLOCK);
-                    } else {
-                        builder.blobType(AzureStorageUtils.PAGE);
-                    }
+                                              .etag(properties.getETag())
+                                              
.lastModifiedTime(properties.getLastModified().toEpochSecond())
+                                              
.length(properties.getContentLength());
+

Review comment:
       Minor backward incompatibility: secondaryUri is missing.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
##########
@@ -114,14 +112,14 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 }
 
                 try {
-                    blob.upload(in, length, null, null, operationContext);
+                    blob.upload(in, length);
                     BlobProperties properties = blob.getProperties();
                     attributes.put("azure.container", containerName);
-                    attributes.put("azure.primaryUri", 
blob.getSnapshotQualifiedUri().toString());
-                    attributes.put("azure.etag", properties.getEtag());
+                    attributes.put("azure.primaryUri", blob.getBlobUrl());

Review comment:
       `BlobClient.getBlobUrl()` returns the slashes (/) in the blob name in 
url encoded form.
   Eg.: 
`https://mystorageaccount.blob.core.windows.net/my-blob-container/dir1%2Ffile1`
   It was earlier (and I think it still should be): 
`https://mystorageaccount.blob.core.windows.net/my-blob-container/dir1/file1`
   It seems to me a bug in the new client lib.
   It affects the List processor too.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
##########
@@ -147,18 +154,40 @@ private void configureControllerService(String 
accountName, String accountKey, S
         
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, 
CREDENTIALS_SERVICE_VALUE);
     }
 
-    private void 
assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails
 storageCredentialsDetails) {
+    private void assertStorageCredentialsDetailsAccountNameAndAccountKey(
+            AzureStorageCredentialsDetails storageCredentialsDetails) {
+
         assertEquals(ACCOUNT_NAME_VALUE, 
storageCredentialsDetails.getStorageAccountName());
-        assertTrue(storageCredentialsDetails.getStorageCredentials() 
instanceof StorageCredentialsAccountAndKey);
-        StorageCredentialsAccountAndKey storageCredentials = 
(StorageCredentialsAccountAndKey) 
storageCredentialsDetails.getStorageCredentials();
+        assertTrue(storageCredentialsDetails.getCredentialType() == 
AzureStorageCredentialsDetails.CredentialType.STORAGE_ACCOUNT_KEY);

Review comment:
       `assertSame` should be used
   Also in `assertStorageCredentialsDetailsAccountNameAndSasToken`

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureProxyUtilsGetProxyOptions.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.ProxyOptions.Type;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAzureProxyUtilsGetProxyOptions {
+
+    private MockProcessContext processContext;
+    private ProxyConfiguration proxyConfig;
+
+    private static final String PROXY_CONFIG_SERVICE_VALUE = 
"ProxyConfigurationService";
+    private static final String PROXY_HOST = "localhost";
+    private static final Integer PROXY_PORT = 9000;
+    private static final String PROXY_USER = "microsoft";
+    private static final String PROXY_PASSWORD = "azure";
+
+    @Before
+    public void setUp() {
+        final Processor processor = new ListAzureBlobStorage();
+        processContext = new MockProcessContext(processor);
+    }
+
+    private void configureMockedHTTPProxyService(String proxyHost, Integer 
proxyPort, String proxyUser, String proxyUserPassword) {
+        proxyConfig = new ProxyConfiguration();
+        proxyConfig.setProxyType(Proxy.Type.HTTP);
+
+        if(StringUtils.isNotBlank(proxyHost)) {
+            proxyConfig.setProxyServerHost(proxyHost);
+        }
+        if(proxyPort != null) {
+            proxyConfig.setProxyServerPort(proxyPort);
+        }
+        if(StringUtils.isNotBlank(proxyUser)) {
+            proxyConfig.setProxyUserName(proxyUser);
+        }
+        if(StringUtils.isNotBlank(proxyUserPassword)) {
+            proxyConfig.setProxyUserPassword(proxyUserPassword);
+        }
+
+        MockProxyConfigurationService mockProxyConfigurationService = new 
MockProxyConfigurationService(proxyConfig);
+        // set mocked proxy service
+        processContext.addControllerService(mockProxyConfigurationService, 
PROXY_CONFIG_SERVICE_VALUE);
+        
processContext.setProperty(AzureProxyUtils.PROXY_CONFIGURATION_SERVICE, 
PROXY_CONFIG_SERVICE_VALUE);
+    }
+
+    @Test
+    public void testHTTPProxy() {
+        configureMockedHTTPProxyService(PROXY_HOST, PROXY_PORT, null, null);
+        final MockProxyConfigurationService mockProxyConfigurationService = 
processContext.getProperty(
+            
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+        final ProxyOptions proxyOptions = 
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+        final InetSocketAddress socketAddress = new 
InetSocketAddress(PROXY_HOST, PROXY_PORT);
+
+        assertEquals(proxyOptions.getAddress(), socketAddress);
+        assertEquals(proxyOptions.getType(), Type.HTTP);

Review comment:
       The expected value should be the 1st argument.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
##########
@@ -54,25 +57,27 @@ public void setUp() {
     public void testAccountNameAndAccountKeyConfiguredOnProcessor() {
         configureProcessorProperties(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, 
null);
 
-        AzureStorageCredentialsDetails storageCredentialsDetails = 
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+        AzureStorageCredentialsDetails storageCredentialsDetails = 
AzureStorageUtils
+                .getStorageCredentialsDetails(processContext, null);
 
         
assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
     }
 
     @Test
     public void testAccountNameAndSasTokenConfiguredOnProcessor() {
         configureProcessorProperties(ACCOUNT_NAME_VALUE, null, 
SAS_TOKEN_VALUE);
-
-        AzureStorageCredentialsDetails storageCredentialsDetails = 
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+        AzureStorageCredentialsDetails storageCredentialsDetails = 
AzureStorageUtils
+                .getStorageCredentialsDetails(processContext, null);
 
         
assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
     }
 
     @Test
     public void testAccountNameAndAccountKeyConfiguredOnControllerService() {
-        configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, 
null);
 
-        AzureStorageCredentialsDetails storageCredentialsDetails = 
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+        configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, 
null);
+        AzureStorageCredentialsDetails storageCredentialsDetails = 
AzureStorageUtils

Review comment:
       Please do not reformat the existing code.
   The max. line length is 200 in checkstyle config.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureProxyUtilsGetProxyOptions.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.ProxyOptions.Type;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAzureProxyUtilsGetProxyOptions {
+
+    private MockProcessContext processContext;
+    private ProxyConfiguration proxyConfig;
+
+    private static final String PROXY_CONFIG_SERVICE_VALUE = 
"ProxyConfigurationService";
+    private static final String PROXY_HOST = "localhost";
+    private static final Integer PROXY_PORT = 9000;
+    private static final String PROXY_USER = "microsoft";
+    private static final String PROXY_PASSWORD = "azure";
+
+    @Before
+    public void setUp() {
+        final Processor processor = new ListAzureBlobStorage();
+        processContext = new MockProcessContext(processor);
+    }
+
+    private void configureMockedHTTPProxyService(String proxyHost, Integer 
proxyPort, String proxyUser, String proxyUserPassword) {
+        proxyConfig = new ProxyConfiguration();
+        proxyConfig.setProxyType(Proxy.Type.HTTP);
+
+        if(StringUtils.isNotBlank(proxyHost)) {
+            proxyConfig.setProxyServerHost(proxyHost);
+        }
+        if(proxyPort != null) {
+            proxyConfig.setProxyServerPort(proxyPort);
+        }
+        if(StringUtils.isNotBlank(proxyUser)) {
+            proxyConfig.setProxyUserName(proxyUser);
+        }
+        if(StringUtils.isNotBlank(proxyUserPassword)) {
+            proxyConfig.setProxyUserPassword(proxyUserPassword);
+        }
+
+        MockProxyConfigurationService mockProxyConfigurationService = new 
MockProxyConfigurationService(proxyConfig);
+        // set mocked proxy service
+        processContext.addControllerService(mockProxyConfigurationService, 
PROXY_CONFIG_SERVICE_VALUE);
+        
processContext.setProperty(AzureProxyUtils.PROXY_CONFIGURATION_SERVICE, 
PROXY_CONFIG_SERVICE_VALUE);
+    }
+
+    @Test
+    public void testHTTPProxy() {
+        configureMockedHTTPProxyService(PROXY_HOST, PROXY_PORT, null, null);
+        final MockProxyConfigurationService mockProxyConfigurationService = 
processContext.getProperty(
+            
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+        final ProxyOptions proxyOptions = 
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+        final InetSocketAddress socketAddress = new 
InetSocketAddress(PROXY_HOST, PROXY_PORT);
+
+        assertEquals(proxyOptions.getAddress(), socketAddress);
+        assertEquals(proxyOptions.getType(), Type.HTTP);
+        // null asserts
+        assertNull(proxyOptions.getUsername());
+        assertNull(proxyOptions.getPassword());
+    }
+
+    @Test
+    public void testHTTPProxyWithAuth() {
+        configureMockedHTTPProxyService(PROXY_HOST, PROXY_PORT, PROXY_USER, 
PROXY_PASSWORD);
+        final MockProxyConfigurationService mockProxyConfigurationService = 
processContext.getProperty(
+            
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+        final ProxyOptions proxyOptions = 
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+        final InetSocketAddress socketAddress = new 
InetSocketAddress(PROXY_HOST, PROXY_PORT);
+
+        assertEquals(proxyOptions.getAddress(), socketAddress);
+        assertEquals(proxyOptions.getType(), Type.HTTP);
+        assertEquals(proxyOptions.getUsername(), PROXY_USER);
+        assertEquals(proxyOptions.getPassword(), PROXY_PASSWORD);
+    }
+
+    @Test
+    public void testHTTPProxyWithOnlyUser() {
+        configureMockedHTTPProxyService(PROXY_HOST, PROXY_PORT, PROXY_USER, 
null);
+        final MockProxyConfigurationService mockProxyConfigurationService = 
processContext.getProperty(
+            
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+        final ProxyOptions proxyOptions = 
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+        final InetSocketAddress socketAddress = new 
InetSocketAddress(PROXY_HOST, PROXY_PORT);
+
+        assertEquals(proxyOptions.getAddress(), socketAddress);
+        assertEquals(proxyOptions.getType(), Type.HTTP);
+        // null asserts
+        assertNull(proxyOptions.getUsername());
+        assertNull(proxyOptions.getPassword());
+    }
+
+    @Test
+    public void testHTTPProxyWithOnlyProxyPort() {
+        configureMockedHTTPProxyService(null, PROXY_PORT, null, null);
+        final MockProxyConfigurationService mockProxyConfigurationService = 
processContext.getProperty(
+            
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+        final ProxyOptions proxyOptions = 
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+
+        // null asserts
+        assertNull(proxyOptions);
+    }
+
+    @Test
+    public void testHTTPProxyWithoutInput() {
+        configureMockedHTTPProxyService(null, null, null, null);
+        final MockProxyConfigurationService mockProxyConfigurationService = 
processContext.getProperty(
+            
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+        final ProxyOptions proxyOptions = 
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+
+        // null asserts
+        assertNull(proxyOptions);
+    }
+
+    private class MockProxyConfigurationService extends 
AbstractControllerService implements ProxyConfigurationService {

Review comment:
       The nested class could be static.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
##########
@@ -82,21 +79,23 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
         final long startNanos = System.nanoTime();
         final String containerName = 
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
         final String blobPath = 
context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
-        final String deleteSnapshotOptions = 
context.getProperty(DELETE_SNAPSHOTS_OPTION).getValue();
+        final String deleteSnapshotOption = 
context.getProperty(DELETE_SNAPSHOTS_OPTION).isSet()

Review comment:
       It seems `deleteSnapshotsOption` would be the right name.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, 
ProxySpec.SOCKS};
+
+    private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final 
Proxy.Type proxyType) {
+        for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+            if (item.toProxyType() == proxyType) {
+                return item;
+            }
+        }
+        return null;
+    }
+
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration
+            .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+    public static HttpClient createHttpClient(final PropertyContext 
propertyContext) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(propertyContext);
+        final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+        final HttpClient client = new NettyAsyncHttpClientBuilder()
+            .proxy(proxyOptions)
+            .build();
+
+        return client;
+    }
+
+    public static void validateProxySpec(final ValidationContext context, 
final Collection<ValidationResult> results) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
+
+        final String proxyServerHost = proxyConfig.getProxyServerHost();
+        final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+        final String proxyServerUser = proxyConfig.getProxyUserName();
+        final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+        if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort == 
null)
+            || (StringUtils.isBlank(proxyServerHost) && proxyServerPort != 
null)) {
+            results.add(new 
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)

Review comment:
       "Proxy Configuration" would be a more understandable subject for the end 
user I think.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, 
ProxySpec.SOCKS};
+
+    private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final 
Proxy.Type proxyType) {
+        for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+            if (item.toProxyType() == proxyType) {
+                return item;
+            }
+        }
+        return null;
+    }
+
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration
+            .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+    public static HttpClient createHttpClient(final PropertyContext 
propertyContext) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(propertyContext);
+        final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+        final HttpClient client = new NettyAsyncHttpClientBuilder()
+            .proxy(proxyOptions)
+            .build();
+
+        return client;
+    }
+
+    public static void validateProxySpec(final ValidationContext context, 
final Collection<ValidationResult> results) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
+
+        final String proxyServerHost = proxyConfig.getProxyServerHost();
+        final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+        final String proxyServerUser = proxyConfig.getProxyUserName();
+        final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+        if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort == 
null)

Review comment:
       These checks are relevant in case of `HTTP` and `SOCKS` proxy but not 
for `DIRECT`.
   In case of `DIRECT` (that is no proxy), it could be checked that no other 
properties are specified.

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, 
ProxySpec.SOCKS};
+
+    private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final 
Proxy.Type proxyType) {
+        for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+            if (item.toProxyType() == proxyType) {
+                return item;
+            }
+        }
+        return null;
+    }
+
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration
+            .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+    public static HttpClient createHttpClient(final PropertyContext 
propertyContext) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(propertyContext);
+        final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+        final HttpClient client = new NettyAsyncHttpClientBuilder()
+            .proxy(proxyOptions)
+            .build();
+
+        return client;
+    }
+
+    public static void validateProxySpec(final ValidationContext context, 
final Collection<ValidationResult> results) {
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
+
+        final String proxyServerHost = proxyConfig.getProxyServerHost();
+        final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+        final String proxyServerUser = proxyConfig.getProxyUserName();
+        final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+        if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort == 
null)
+            || (StringUtils.isBlank(proxyServerHost) && proxyServerPort != 
null)) {
+            results.add(new 
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+                    .explanation(
+                            "When specifying address information, both `host` 
and `port` information must be provided.")
+                    .build());
+        }
+
+        if ((StringUtils.isBlank(proxyServerUser) && 
StringUtils.isNotBlank(proxyServerPassword))
+            || (StringUtils.isNotBlank(proxyServerUser) && 
StringUtils.isBlank(proxyServerPassword))) {
+            results.add(new 
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+                    .explanation(
+                        "When specifying credentials, both `user` and 
`password` must be provided.")
+                    .build());
+        }
+
+        ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
+    }
+
+    public static ProxyOptions getProxyOptions(final ProxyConfiguration 
proxyConfig) {
+        final String proxyServerHost = proxyConfig.getProxyServerHost();
+        final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+        final String proxyServerUser = proxyConfig.getProxyUserName();
+        final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+        final Boolean proxyServerProvided = 
StringUtils.isNotBlank(proxyServerHost) && proxyServerPort != null;
+        final Boolean proxyCredentialsProvided = 
StringUtils.isNotBlank(proxyServerUser) && 
StringUtils.isNotBlank(proxyServerPassword);
+
+        // if no endpoint is provided, return zero
+        if (!proxyServerProvided) {
+            return null;
+        }
+
+        // translate Proxy.Type to ProxyOptions.Type
+        final ProxyOptions.Type proxyType = 
getProxyOptionsTypeFromProxyType(proxyConfig.getProxyType());
+        final InetSocketAddress socketAddress = new 
InetSocketAddress(proxyServerHost, proxyServerPort);
+
+        final ProxyOptions proxyOptions = new ProxyOptions(proxyType, 
socketAddress);
+
+        if (proxyCredentialsProvided) {
+            return proxyOptions.setCredentials(proxyServerUser, 
proxyServerPassword);
+        } else {
+            return proxyOptions;
+        }

Review comment:
       ```
           if (proxyCredentialsProvided) {
               proxyOptions.setCredentials(proxyServerUser, 
proxyServerPassword);
           }
   
           return proxyOptions;
   ```
   would be more readable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to