This is an automated email from the ASF dual-hosted git repository.

nsabonyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 5d4ced2f22 NIFI-11586 Added 
AzureStorageCredentialsControllerServiceLookup_v12
5d4ced2f22 is described below

commit 5d4ced2f22e7d04356dc74a1211dabefc189a4f9
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue May 23 22:50:47 2023 +0200

    NIFI-11586 Added AzureStorageCredentialsControllerServiceLookup_v12
    
    Also added client caching in blob v_12 processors which was needed to 
support multiple credentials provided by the new lookup service
    
    Backported
    This closes #7300
    
    Signed-off-by: Nandor Soma Abonyi <[email protected]>
    (cherry picked from commit 963518d943c75c8d053834be51c2947470767229)
---
 .../azure/AbstractAzureBlobProcessor_v12.java      |  68 ++--------
 .../AbstractAzureDataLakeStorageProcessor.java     |   2 +-
 .../azure/storage/DeleteAzureBlobStorage_v12.java  |   2 +-
 .../azure/storage/FetchAzureBlobStorage_v12.java   |   2 +-
 .../azure/storage/ListAzureBlobStorage_v12.java    |  15 ++-
 .../azure/storage/ListAzureDataLakeStorage.java    |   2 +-
 .../azure/storage/PutAzureBlobStorage_v12.java     |   2 +-
 .../utils/AbstractStorageClientFactory.java        |  63 +++++++++
 .../storage/utils/BlobServiceClientFactory.java    |  79 +++++++++++
 .../utils/DataLakeServiceClientFactory.java        |  51 +------
 ...rageCredentialsControllerServiceLookup_v12.java |  56 ++++++++
 ...ureStorageCredentialsControllerService_v12.java |   3 +-
 .../org.apache.nifi.controller.ControllerService   |   1 +
 .../utils/BlobServiceClientFactoryTest.java        |  80 +++++++++++
 ...rageCredentialsControllerServiceLookup_v12.java | 147 +++++++++++++++++++++
 ...ureStorageCredentialsControllerService_v12.java |  12 +-
 .../AzureStorageCredentialsDetails_v12.java        |  41 ++++++
 .../AzureStorageCredentialsService_v12.java        |   5 +-
 18 files changed, 513 insertions(+), 118 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
index c1dd3dfdad..a131b2ec92 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
@@ -16,29 +16,22 @@
  */
 package org.apache.nifi.processors.azure;
 
-import com.azure.core.credential.AzureSasCredential;
-import com.azure.core.credential.TokenCredential;
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.identity.ManagedIdentityCredentialBuilder;
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.blob.models.BlobProperties;
-import com.azure.storage.common.StorageSharedKeyCredential;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory;
 import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
 import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
-import reactor.core.publisher.Mono;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -92,7 +85,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends 
AbstractProcessor {
             REL_FAILURE
     )));
 
-    private BlobServiceClient storageClient;
+    private volatile BlobServiceClientFactory clientFactory;
 
     @Override
     public Set<Relationship> getRelationships() {
@@ -101,64 +94,23 @@ public abstract class AbstractAzureBlobProcessor_v12 
extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(ProcessContext context) {
-        storageClient = createStorageClient(context);
+        clientFactory = new BlobServiceClientFactory(getLogger(), 
getProxyOptions(context));
     }
 
     @OnStopped
     public void onStopped() {
-        storageClient = null;
+        clientFactory = null;
     }
 
-    protected BlobServiceClient getStorageClient() {
-        return storageClient;
-    }
+    protected BlobServiceClient getStorageClient(PropertyContext context, 
FlowFile flowFile) {
+        final Map<String, String> attributes = flowFile != null ? 
flowFile.getAttributes() : Collections.emptyMap();
 
-    public static BlobServiceClient createStorageClient(PropertyContext 
context) {
         final AzureStorageCredentialsService_v12 credentialsService = 
context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
-        final AzureStorageCredentialsDetails_v12 credentialsDetails = 
credentialsService.getCredentialsDetails();
-
-        final BlobServiceClientBuilder clientBuilder = new 
BlobServiceClientBuilder();
-        clientBuilder.endpoint(String.format("https://%s.%s";, 
credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix()));
-
-        final NettyAsyncHttpClientBuilder nettyClientBuilder = new 
NettyAsyncHttpClientBuilder();
-
-        nettyClientBuilder.proxy(getProxyOptions(context));
+        final AzureStorageCredentialsDetails_v12 credentialsDetails = 
credentialsService.getCredentialsDetails(attributes);
 
-        final HttpClient nettyClient = nettyClientBuilder.build();
-        clientBuilder.httpClient(nettyClient);
+        final BlobServiceClient storageClient = 
clientFactory.getStorageClient(credentialsDetails);
 
-        configureCredential(clientBuilder, credentialsService, 
credentialsDetails);
-
-        return clientBuilder.buildClient();
-    }
-
-    private static void configureCredential(BlobServiceClientBuilder 
clientBuilder, AzureStorageCredentialsService_v12 credentialsService, 
AzureStorageCredentialsDetails_v12 credentialsDetails) {
-        switch (credentialsDetails.getCredentialsType()) {
-            case ACCOUNT_KEY:
-                clientBuilder.credential(new 
StorageSharedKeyCredential(credentialsDetails.getAccountName(), 
credentialsDetails.getAccountKey()));
-                break;
-            case SAS_TOKEN:
-                clientBuilder.credential(new 
AzureSasCredential(credentialsDetails.getSasToken()));
-                break;
-            case MANAGED_IDENTITY:
-                clientBuilder.credential(new ManagedIdentityCredentialBuilder()
-                        
.clientId(credentialsDetails.getManagedIdentityClientId())
-                        .build());
-                break;
-            case SERVICE_PRINCIPAL:
-                clientBuilder.credential(new ClientSecretCredentialBuilder()
-                        
.tenantId(credentialsDetails.getServicePrincipalTenantId())
-                        
.clientId(credentialsDetails.getServicePrincipalClientId())
-                        
.clientSecret(credentialsDetails.getServicePrincipalClientSecret())
-                        .build());
-                break;
-            case ACCESS_TOKEN:
-                TokenCredential credential = tokenRequestContext -> 
Mono.just(credentialsService.getCredentialsDetails().getAccessToken());
-                clientBuilder.credential(credential);
-                break;
-            default:
-                throw new IllegalArgumentException("Unhandled credentials 
type: " + credentialsDetails.getCredentialsType());
-        }
+        return storageClient;
     }
 
     protected Map<String, String> createBlobAttributesMap(BlobClient 
blobClient) {
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 41e677d0a8..b16176f3fd 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -95,7 +95,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor 
extends AbstractProc
 
     public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
 
-    private DataLakeServiceClientFactory clientFactory;
+    private volatile DataLakeServiceClientFactory clientFactory;
 
     @Override
     public Set<Relationship> getRelationships() {
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
index b49cbd0247..6f820588bd 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
@@ -101,7 +101,7 @@ public class DeleteAzureBlobStorage_v12 extends 
AbstractAzureBlobProcessor_v12 {
 
         long startNanos = System.nanoTime();
         try {
-            BlobServiceClient storageClient = getStorageClient();
+            BlobServiceClient storageClient = getStorageClient(context, 
flowFile);
             BlobContainerClient containerClient = 
storageClient.getBlobContainerClient(containerName);
             BlobClient blobClient = containerClient.getBlobClient(blobName);
 
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
index 775c9d7ab0..4d9db2d5b1 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
@@ -152,7 +152,7 @@ public class FetchAzureBlobStorage_v12 extends 
AbstractAzureBlobProcessor_v12 im
         Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? 
context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()
 : null);
 
         try {
-            BlobServiceClient storageClient = getStorageClient();
+            BlobServiceClient storageClient = getStorageClient(context, 
flowFile);
             BlobContainerClient containerClient = 
storageClient.getBlobContainerClient(containerName);
             final BlobClient blobClient;
             if (isClientSideEncryptionEnabled(context)) {
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
index 9a36abf13f..0077f6f392 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
@@ -46,8 +46,11 @@ import 
org.apache.nifi.processor.util.list.ListedEntityTracker;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
 import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
+import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.serialization.record.RecordSchema;
+import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
+import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -59,7 +62,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE;
-import static 
org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.createStorageClient;
+import static 
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@@ -148,7 +151,7 @@ public class ListAzureBlobStorage_v12 extends 
AbstractListAzureProcessor<BlobInf
             AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
     ));
 
-    private BlobServiceClient storageClient;
+    private volatile BlobServiceClientFactory clientFactory;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -157,12 +160,12 @@ public class ListAzureBlobStorage_v12 extends 
AbstractListAzureProcessor<BlobInf
 
     @OnScheduled
     public void onScheduled(ProcessContext context) {
-        storageClient = createStorageClient(context);
+        clientFactory = new BlobServiceClientFactory(getLogger(), 
getProxyOptions(context));
     }
 
     @OnStopped
     public void onStopped() {
-        storageClient = null;
+        clientFactory = null;
     }
 
     @Override
@@ -215,6 +218,10 @@ public class ListAzureBlobStorage_v12 extends 
AbstractListAzureProcessor<BlobInf
         try {
             final List<BlobInfo> listing = new ArrayList<>();
 
+            final AzureStorageCredentialsService_v12 credentialsService = 
context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
+            final AzureStorageCredentialsDetails_v12 credentialsDetails = 
credentialsService.getCredentialsDetails(Collections.emptyMap());
+            final BlobServiceClient storageClient = 
clientFactory.getStorageClient(credentialsDetails);
+
             final BlobContainerClient containerClient = 
storageClient.getBlobContainerClient(containerName);
 
             final ListBlobsOptions options = new ListBlobsOptions()
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index b568722e9d..5cfadc3430 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -175,7 +175,7 @@ public class ListAzureDataLakeStorage extends 
AbstractListAzureProcessor<ADLSFil
     private volatile Pattern filePattern;
     private volatile Pattern pathPattern;
 
-    private DataLakeServiceClientFactory clientFactory;
+    private volatile DataLakeServiceClientFactory clientFactory;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index a0aeb4e297..a7dfca488f 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -160,7 +160,7 @@ public class PutAzureBlobStorage_v12 extends 
AbstractAzureBlobProcessor_v12 impl
 
         long startNanos = System.nanoTime();
         try {
-            BlobServiceClient storageClient = getStorageClient();
+            BlobServiceClient storageClient = getStorageClient(context, 
flowFile);
             BlobContainerClient containerClient = 
storageClient.getBlobContainerClient(containerName);
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AbstractStorageClientFactory.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AbstractStorageClientFactory.java
new file mode 100644
index 0000000000..0c9d9df111
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AbstractStorageClientFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.core.http.ProxyOptions;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.logging.ComponentLog;
+
+abstract class AbstractStorageClientFactory<CREDENTIAL, CLIENT> {
+
+    private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
+
+    private final ComponentLog logger;
+    private final ProxyOptions proxyOptions;
+
+    private final Cache<CREDENTIAL, CLIENT> clientCache;
+
+    protected AbstractStorageClientFactory(final ComponentLog logger, final 
ProxyOptions proxyOptions) {
+        this.logger = logger;
+        this.proxyOptions = proxyOptions;
+        this.clientCache = createCache();
+    }
+
+    private Cache<CREDENTIAL, CLIENT> createCache() {
+        // Beware! By default, Caffeine does not perform cleanup and evict 
values
+        // "automatically" or instantly after a value expires. Because of that 
it
+        // can happen that there are more elements in the cache than the 
maximum size.
+        // See: https://github.com/ben-manes/caffeine/wiki/Cleanup
+        return Caffeine.newBuilder()
+                .maximumSize(STORAGE_CLIENT_CACHE_SIZE)
+                .build();
+    }
+
+    /**
+     * Retrieves storage client object
+     *
+     * @param credentialsDetails used for caching because it can contain 
properties that are results of an expression
+     * @return CLIENT
+     */
+    public CLIENT getStorageClient(final CREDENTIAL credentialsDetails) {
+        return clientCache.get(credentialsDetails, __ -> {
+            logger.debug(credentialsDetails.getClass().getSimpleName() + " is 
not found in the cache with the given credentials. Creating it.");
+            return createStorageClient(credentialsDetails, proxyOptions);
+        });
+    }
+
+    protected abstract CLIENT createStorageClient(CREDENTIAL 
credentialsDetails, ProxyOptions proxyOptions);
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
new file mode 100644
index 0000000000..b5a354b145
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.core.credential.AzureSasCredential;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.util.ClientOptions;
+import com.azure.core.util.HttpClientOptions;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.nifi.logging.ComponentLog;
+import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
+import reactor.core.publisher.Mono;
+
+public class BlobServiceClientFactory extends 
AbstractStorageClientFactory<AzureStorageCredentialsDetails_v12, 
BlobServiceClient> {
+
+    public BlobServiceClientFactory(final ComponentLog logger, final 
ProxyOptions proxyOptions) {
+        super(logger, proxyOptions);
+    }
+
+    protected BlobServiceClient createStorageClient(final 
AzureStorageCredentialsDetails_v12 credentialsDetails, final ProxyOptions 
proxyOptions) {
+        final BlobServiceClientBuilder clientBuilder = new 
BlobServiceClientBuilder();
+        clientBuilder.endpoint(String.format("https://%s.%s";, 
credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix()));
+
+        final ClientOptions clientOptions = new 
HttpClientOptions().setProxyOptions(proxyOptions);
+        clientBuilder.clientOptions(clientOptions);
+
+        configureCredential(clientBuilder, credentialsDetails);
+
+        return clientBuilder.buildClient();
+    }
+
+    private void configureCredential(final BlobServiceClientBuilder 
clientBuilder, final AzureStorageCredentialsDetails_v12 credentialsDetails) {
+        switch (credentialsDetails.getCredentialsType()) {
+            case ACCOUNT_KEY:
+                clientBuilder.credential(new 
StorageSharedKeyCredential(credentialsDetails.getAccountName(), 
credentialsDetails.getAccountKey()));
+                break;
+            case SAS_TOKEN:
+                clientBuilder.credential(new 
AzureSasCredential(credentialsDetails.getSasToken()));
+                break;
+            case MANAGED_IDENTITY:
+                clientBuilder.credential(new ManagedIdentityCredentialBuilder()
+                        
.clientId(credentialsDetails.getManagedIdentityClientId())
+                        .build());
+                break;
+            case SERVICE_PRINCIPAL:
+                clientBuilder.credential(new ClientSecretCredentialBuilder()
+                        
.tenantId(credentialsDetails.getServicePrincipalTenantId())
+                        
.clientId(credentialsDetails.getServicePrincipalClientId())
+                        
.clientSecret(credentialsDetails.getServicePrincipalClientSecret())
+                        .build());
+                break;
+            case ACCESS_TOKEN:
+                TokenCredential credential = tokenRequestContext -> 
Mono.just(credentialsDetails.getAccessToken());
+                clientBuilder.credential(credential);
+                break;
+            default:
+                throw new IllegalArgumentException("Unhandled credentials 
type: " + credentialsDetails.getCredentialsType());
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
index 672618beb8..51ee6ff9b3 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
@@ -18,9 +18,9 @@ package org.apache.nifi.processors.azure.storage.utils;
 
 import com.azure.core.credential.AccessToken;
 import com.azure.core.credential.TokenCredential;
-import com.azure.core.http.HttpClient;
 import com.azure.core.http.ProxyOptions;
-import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.core.util.ClientOptions;
+import com.azure.core.util.HttpClientOptions;
 import com.azure.identity.ClientSecretCredential;
 import com.azure.identity.ClientSecretCredentialBuilder;
 import com.azure.identity.ManagedIdentityCredential;
@@ -28,52 +28,18 @@ import com.azure.identity.ManagedIdentityCredentialBuilder;
 import com.azure.storage.common.StorageSharedKeyCredential;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
 import reactor.core.publisher.Mono;
 
-public class DataLakeServiceClientFactory {
-
-    private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
-
-    private final ComponentLog logger;
-    private final ProxyOptions proxyOptions;
-
-    private final Cache<ADLSCredentialsDetails, DataLakeServiceClient> 
clientCache;
+public class DataLakeServiceClientFactory extends 
AbstractStorageClientFactory<ADLSCredentialsDetails, DataLakeServiceClient> {
 
     public DataLakeServiceClientFactory(ComponentLog logger, ProxyOptions 
proxyOptions) {
-        this.logger = logger;
-        this.proxyOptions = proxyOptions;
-        this.clientCache = createCache();
-    }
-
-    private Cache<ADLSCredentialsDetails, DataLakeServiceClient> createCache() 
{
-        // Beware! By default, Caffeine does not perform cleanup and evict 
values
-        // "automatically" or instantly after a value expires. Because of that 
it
-        // can happen that there are more elements in the cache than the 
maximum size.
-        // See: https://github.com/ben-manes/caffeine/wiki/Cleanup
-        return Caffeine.newBuilder()
-                .maximumSize(STORAGE_CLIENT_CACHE_SIZE)
-                .build();
-    }
-
-    /**
-     * Retrieves a {@link DataLakeServiceClient}
-     *
-     * @param credentialsDetails used for caching because it can contain 
properties that are results of an expression
-     * @return DataLakeServiceClient
-     */
-    public DataLakeServiceClient getStorageClient(ADLSCredentialsDetails 
credentialsDetails) {
-        return clientCache.get(credentialsDetails, __ -> {
-            logger.debug("DataLakeServiceClient is not found in the cache with 
the given credentials. Creating it.");
-            return createStorageClient(credentialsDetails, proxyOptions);
-        });
+        super(logger, proxyOptions);
     }
 
-    private static DataLakeServiceClient 
createStorageClient(ADLSCredentialsDetails credentialsDetails, ProxyOptions 
proxyOptions) {
+    protected DataLakeServiceClient createStorageClient(ADLSCredentialsDetails 
credentialsDetails, ProxyOptions proxyOptions) {
         final String accountName = credentialsDetails.getAccountName();
         final String accountKey = credentialsDetails.getAccountKey();
         final String sasToken = credentialsDetails.getSasToken();
@@ -114,11 +80,8 @@ public class DataLakeServiceClientFactory {
             throw new IllegalArgumentException("No valid credentials were 
provided");
         }
 
-        final NettyAsyncHttpClientBuilder nettyClientBuilder = new 
NettyAsyncHttpClientBuilder();
-        nettyClientBuilder.proxy(proxyOptions);
-
-        final HttpClient nettyClient = nettyClientBuilder.build();
-        dataLakeServiceClientBuilder.httpClient(nettyClient);
+        final ClientOptions clientOptions = new 
HttpClientOptions().setProxyOptions(proxyOptions);
+        dataLakeServiceClientBuilder.clientOptions(clientOptions);
 
         return dataLakeServiceClientBuilder.buildClient();
     }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup_v12.java
new file mode 100644
index 0000000000..5b2e19fbde
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup_v12.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import 
org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
+
+import java.util.Map;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", 
"credentials" })
+@CapabilityDescription("Provides an AzureStorageCredentialsService_v12 that 
can be used to dynamically select another AzureStorageCredentialsService_v12. " 
+
+        "This service requires an attribute named 
'azure.storage.credentials.name' to be passed in, and will throw an exception 
if the attribute is missing. " +
+        "The value of 'azure.storage.credentials.name' will be used to select 
the AzureStorageCredentialsService_v12 that has been registered with that name. 
" +
+        "This will allow multiple AzureStorageCredentialsServices_v12 to be 
defined and registered, and then selected dynamically at runtime by tagging 
flow files " +
+        "with the appropriate 'azure.storage.credentials.name' attribute.")
+@DynamicProperty(name = "The name to register 
AzureStorageCredentialsService_v12", value = "The 
AzureStorageCredentialsService_v12",
+        description = "If '" + 
AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE
 + "' attribute contains " +
+                "the name of the dynamic property, then the 
AzureStorageCredentialsService_v12 (registered in the value) will be selected.",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class AzureStorageCredentialsControllerServiceLookup_v12
+        extends 
AbstractSingleAttributeBasedControllerServiceLookup<AzureStorageCredentialsService_v12>
 implements AzureStorageCredentialsService_v12 {
+
+    public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE = 
"azure.storage.credentials.name";
+
+    @Override
+    protected String getLookupAttribute() {
+        return AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE;
+    }
+
+    @Override
+    public Class<AzureStorageCredentialsService_v12> getServiceType() {
+        return AzureStorageCredentialsService_v12.class;
+    }
+
+    @Override
+    public AzureStorageCredentialsDetails_v12 getCredentialsDetails(final 
Map<String, String> attributes) {
+        return lookupService(attributes).getCredentialsDetails(attributes);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
index ba8101d90d..696f86d27f 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
@@ -29,6 +29,7 @@ import 
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Provides credentials details for Azure Blob processors
@@ -133,7 +134,7 @@ public class AzureStorageCredentialsControllerService_v12 
extends AbstractContro
     }
 
     @Override
-    public AzureStorageCredentialsDetails_v12 getCredentialsDetails() {
+    public AzureStorageCredentialsDetails_v12 
getCredentialsDetails(Map<String, String> attributes) {
         String accountName = context.getProperty(ACCOUNT_NAME).getValue();
         String endpointSuffix = 
context.getProperty(ENDPOINT_SUFFIX).getValue();
         AzureStorageCredentialsType credentialsType = 
AzureStorageCredentialsType.valueOf(context.getProperty(CREDENTIALS_TYPE).getValue());
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 0e5796ac58..42cd395867 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -20,4 +20,5 @@ 
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerServiceLookup
 org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService
 
org.apache.nifi.services.azure.storage.AzureStorageEmulatorCredentialsControllerService
 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12
+org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12
 org.apache.nifi.services.azure.StandardAzureCredentialsControllerService
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactoryTest.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactoryTest.java
new file mode 100644
index 0000000000..986788795e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.nifi.logging.ComponentLog;
+import 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+@ExtendWith(MockitoExtension.class)
+class BlobServiceClientFactoryTest {
+
+    @Mock
+    private ComponentLog logger;
+
+    @Test
+    void testThatServiceClientIsCachedByCredentials() {
+        final BlobServiceClientFactory clientFactory = new 
BlobServiceClientFactory(logger, null);
+
+        final AzureStorageCredentialsDetails_v12 credentials = 
createCredentialDetails("account");
+
+        final BlobServiceClient clientOne = 
clientFactory.getStorageClient(credentials);
+        final BlobServiceClient clientTwo = 
clientFactory.getStorageClient(credentials);
+
+        assertSame(clientOne, clientTwo);
+    }
+
+    @Test
+    void testThatDifferentServiceClientIsReturnedForDifferentCredentials() {
+        final BlobServiceClientFactory clientFactory = new 
BlobServiceClientFactory(logger, null);
+
+        final AzureStorageCredentialsDetails_v12 credentialsOne = 
createCredentialDetails("accountOne");
+        final AzureStorageCredentialsDetails_v12 credentialsTwo = 
createCredentialDetails("accountTwo");
+
+        final BlobServiceClient clientOne = 
clientFactory.getStorageClient(credentialsOne);
+        final BlobServiceClient clientTwo = 
clientFactory.getStorageClient(credentialsTwo);
+
+        assertNotSame(clientOne, clientTwo);
+    }
+
+    @Test
+    void testThatCachedClientIsReturnedAfterDifferentClientIsCreated() {
+        final BlobServiceClientFactory clientFactory = new 
BlobServiceClientFactory(logger, null);
+
+        final AzureStorageCredentialsDetails_v12 credentialsOne = 
createCredentialDetails("accountOne");
+        final AzureStorageCredentialsDetails_v12 credentialsTwo = 
createCredentialDetails("accountTwo");
+        final AzureStorageCredentialsDetails_v12 credentialsThree = 
createCredentialDetails("accountOne");
+
+        final BlobServiceClient clientOne = 
clientFactory.getStorageClient(credentialsOne);
+        final BlobServiceClient clientTwo = 
clientFactory.getStorageClient(credentialsTwo);
+        final BlobServiceClient clientThree = 
clientFactory.getStorageClient(credentialsThree);
+
+        assertNotSame(clientOne, clientTwo);
+        assertSame(clientOne, clientThree);
+    }
+
+    private AzureStorageCredentialsDetails_v12 createCredentialDetails(String 
accountName) {
+        return 
AzureStorageCredentialsDetails_v12.createWithAccountKey(accountName, 
"dfs.core.windows.net", "accountKey");
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup_v12.java
new file mode 100644
index 0000000000..310b11acda
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup_v12.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestAzureStorageCredentialsControllerServiceLookup_v12 {
+
+    private MockAzureStorageCredentialsService serviceA;
+    private MockAzureStorageCredentialsService serviceB;
+
+    private AzureStorageCredentialsControllerServiceLookup_v12 lookupService;
+    private TestRunner runner;
+
+    @BeforeEach
+    public void setup() throws InitializationException {
+        serviceA = new 
MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12.createWithAccountKey("Account_Name_A",
 "core.windows.net", "Account_Key"));
+        serviceB = new 
MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12.createWithSasToken("Account_Name_B",
 "core.windows.net", "SAS_Token"));
+
+        lookupService = new 
AzureStorageCredentialsControllerServiceLookup_v12();
+
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+        final String serviceAIdentifier = "service-a";
+        runner.addControllerService(serviceAIdentifier, serviceA);
+
+        final String serviceBIdentifier = "service-b";
+        runner.addControllerService(serviceBIdentifier, serviceB);
+
+        runner.addControllerService("lookup-service", lookupService);
+        runner.setProperty(lookupService, "a", serviceAIdentifier);
+        runner.setProperty(lookupService, "b", serviceBIdentifier);
+
+        runner.enableControllerService(serviceA);
+        runner.enableControllerService(serviceB);
+        runner.enableControllerService(lookupService);
+    }
+
+    @Test
+    public void testLookupServiceA() {
+        final Map<String,String> attributes = new HashMap<>();
+        
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE,
 "a");
+
+        final AzureStorageCredentialsDetails_v12 storageCredentialsDetails = 
lookupService.getCredentialsDetails(attributes);
+        assertNotNull(storageCredentialsDetails);
+        assertEquals("Account_Name_A", 
storageCredentialsDetails.getAccountName());
+        assertEquals("core.windows.net", 
storageCredentialsDetails.getEndpointSuffix());
+        assertEquals("Account_Key", storageCredentialsDetails.getAccountKey());
+        assertNull(storageCredentialsDetails.getSasToken());
+    }
+
+    @Test
+    public void testLookupServiceB() {
+        final Map<String, String> attributes = new HashMap<>();
+        
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE,
 "b");
+
+        final AzureStorageCredentialsDetails_v12 storageCredentialsDetails = 
lookupService.getCredentialsDetails(attributes);
+        assertNotNull(storageCredentialsDetails);
+        assertEquals("Account_Name_B", 
storageCredentialsDetails.getAccountName());
+        assertEquals("core.windows.net", 
storageCredentialsDetails.getEndpointSuffix());
+        assertEquals("SAS_Token", storageCredentialsDetails.getSasToken());
+        assertNull(storageCredentialsDetails.getAccountKey());
+    }
+
+    @Test
+    public void testLookupMissingCredentialsNameAttribute() {
+        final Map<String, String> attributes = new HashMap<>();
+        assertThrows(ProcessException.class, () -> 
lookupService.getCredentialsDetails(attributes));
+    }
+
+    @Test
+    public void testLookupWithCredentialsNameThatDoesNotExist() {
+        final Map<String, String> attributes = new HashMap<>();
+        
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE,
 "DOES-NOT-EXIST");
+        assertThrows(ProcessException.class, () -> 
lookupService.getCredentialsDetails(attributes));
+    }
+
+    @Test
+    public void testCustomValidateAtLeaseOneServiceDefined() throws 
InitializationException {
+        // enable lookup service with no services registered, verify not valid
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.addControllerService("lookup-service", lookupService);
+        runner.assertNotValid(lookupService);
+
+        final String serviceAIdentifier = "service-a";
+        runner.addControllerService(serviceAIdentifier, serviceA);
+
+        // register a service and now verify valid
+        runner.setProperty(lookupService, "a", serviceAIdentifier);
+        runner.enableControllerService(lookupService);
+        runner.assertValid(lookupService);
+    }
+
+    @Test
+    public void testCustomValidateSelfReferenceNotAllowed() throws 
InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.addControllerService("lookup-service", lookupService);
+        runner.setProperty(lookupService, "lookup-service", "lookup-service");
+        runner.assertNotValid(lookupService);
+    }
+
+    /**
+     * A mock AzureStorageCredentialsService_v12 that will always return the 
passed in AzureStorageCredentialsDetails_v12.
+     */
+    private static class MockAzureStorageCredentialsService extends 
AbstractControllerService implements AzureStorageCredentialsService_v12 {
+
+        private final AzureStorageCredentialsDetails_v12 
storageCredentialsDetails;
+
+        MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12 
storageCredentialsDetails) {
+            this.storageCredentialsDetails = storageCredentialsDetails;
+        }
+
+        @Override
+        public AzureStorageCredentialsDetails_v12 
getCredentialsDetails(Map<String, String> attributes) {
+            return storageCredentialsDetails;
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
index 6bda55e609..d3de995a98 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
@@ -23,6 +23,8 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+
 import static 
org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX;
 import static 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12.ACCOUNT_NAME;
 import static 
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12.CREDENTIALS_TYPE;
@@ -158,7 +160,7 @@ public class 
TestAzureStorageCredentialsControllerService_v12 {
 
         runner.enableControllerService(credentialsService);
 
-        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails();
+        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails(Collections.emptyMap());
 
         assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
         assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@@ -178,7 +180,7 @@ public class 
TestAzureStorageCredentialsControllerService_v12 {
 
         runner.enableControllerService(credentialsService);
 
-        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails();
+        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails(Collections.emptyMap());
 
         assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
         assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@@ -197,7 +199,7 @@ public class 
TestAzureStorageCredentialsControllerService_v12 {
 
         runner.enableControllerService(credentialsService);
 
-        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails();
+        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails(Collections.emptyMap());
 
         assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
         assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@@ -219,7 +221,7 @@ public class 
TestAzureStorageCredentialsControllerService_v12 {
 
         runner.enableControllerService(credentialsService);
 
-        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails();
+        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails(Collections.emptyMap());
 
         assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
         assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@@ -240,7 +242,7 @@ public class 
TestAzureStorageCredentialsControllerService_v12 {
 
         runner.enableControllerService(credentialsService);
 
-        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails();
+        AzureStorageCredentialsDetails_v12 actual = 
credentialsService.getCredentialsDetails(Collections.emptyMap());
 
         assertEquals(ENDPOINT_SUFFIX_VALUE, actual.getEndpointSuffix());
     }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
index 6ab154552a..68457256d6 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
@@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
 
 import com.azure.core.credential.AccessToken;
 
+import java.util.Objects;
+
 public class AzureStorageCredentialsDetails_v12 {
 
     private final String accountName;
@@ -86,6 +88,45 @@ public class AzureStorageCredentialsDetails_v12 {
         return accessToken;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        AzureStorageCredentialsDetails_v12 that = 
(AzureStorageCredentialsDetails_v12) o;
+        return credentialsType == that.credentialsType
+                && Objects.equals(accountName, that.accountName)
+                && Objects.equals(endpointSuffix, that.endpointSuffix)
+                && Objects.equals(accountKey, that.accountKey)
+                && Objects.equals(sasToken, that.sasToken)
+                && Objects.equals(managedIdentityClientId, 
that.managedIdentityClientId)
+                && Objects.equals(servicePrincipalTenantId, 
that.servicePrincipalTenantId)
+                && Objects.equals(servicePrincipalClientId, 
that.servicePrincipalClientId)
+                && Objects.equals(servicePrincipalClientSecret, 
that.servicePrincipalClientSecret)
+                && Objects.equals(accessToken, that.accessToken);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                credentialsType,
+                accountName,
+                endpointSuffix,
+                accountKey,
+                sasToken,
+                managedIdentityClientId,
+                servicePrincipalTenantId,
+                servicePrincipalClientId,
+                servicePrincipalClientSecret,
+                accessToken
+        );
+    }
+
     public static AzureStorageCredentialsDetails_v12 createWithAccountKey(
             String accountName,
             String endpointSuffix,
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService_v12.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService_v12.java
index 06422309c8..27f2630388 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService_v12.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService_v12.java
@@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
 
 import org.apache.nifi.controller.ControllerService;
 
+import java.util.Map;
+
 /**
  * Service interface to provide Azure credentials details for processors using 
Azure Storage Java v12 client library.
  */
@@ -25,7 +27,8 @@ public interface AzureStorageCredentialsService_v12 extends 
ControllerService {
 
     /**
      * Get AzureStorageCredentialsDetails_v12 object which contains the 
Storage Account Name, the Storage Service Endpoint Suffix and the parameters of 
the Storage Credentials
+     * @param attributes FlowFile attributes (typically)
      * @return AzureStorageCredentialsDetails_v12 object
      */
-    AzureStorageCredentialsDetails_v12 getCredentialsDetails();
+    AzureStorageCredentialsDetails_v12 getCredentialsDetails(Map<String, 
String> attributes);
 }

Reply via email to