This is an automated email from the ASF dual-hosted git repository.
nsabonyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 963518d943 NIFI-11586 Added
AzureStorageCredentialsControllerServiceLookup_v12
963518d943 is described below
commit 963518d943c75c8d053834be51c2947470767229
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue May 23 22:50:47 2023 +0200
NIFI-11586 Added AzureStorageCredentialsControllerServiceLookup_v12
Also added client caching in blob v_12 processors which was needed to
support multiple credentials provided by the new lookup service
This closes #7300
Signed-off-by: Nandor Soma Abonyi <[email protected]>
---
.../azure/AbstractAzureBlobProcessor_v12.java | 68 ++--------
.../AbstractAzureDataLakeStorageProcessor.java | 2 +-
.../azure/storage/DeleteAzureBlobStorage_v12.java | 2 +-
.../azure/storage/FetchAzureBlobStorage_v12.java | 2 +-
.../azure/storage/ListAzureBlobStorage_v12.java | 15 ++-
.../azure/storage/ListAzureDataLakeStorage.java | 2 +-
.../azure/storage/PutAzureBlobStorage_v12.java | 2 +-
.../utils/AbstractStorageClientFactory.java | 63 +++++++++
.../storage/utils/BlobServiceClientFactory.java | 79 +++++++++++
.../utils/DataLakeServiceClientFactory.java | 51 +------
...rageCredentialsControllerServiceLookup_v12.java | 56 ++++++++
...ureStorageCredentialsControllerService_v12.java | 3 +-
.../org.apache.nifi.controller.ControllerService | 1 +
.../utils/BlobServiceClientFactoryTest.java | 80 +++++++++++
...rageCredentialsControllerServiceLookup_v12.java | 147 +++++++++++++++++++++
...ureStorageCredentialsControllerService_v12.java | 12 +-
.../AzureStorageCredentialsDetails_v12.java | 41 ++++++
.../AzureStorageCredentialsService_v12.java | 5 +-
18 files changed, 513 insertions(+), 118 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
index c1dd3dfdad..a131b2ec92 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
@@ -16,29 +16,22 @@
*/
package org.apache.nifi.processors.azure;
-import com.azure.core.credential.AzureSasCredential;
-import com.azure.core.credential.TokenCredential;
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobProperties;
-import com.azure.storage.common.StorageSharedKeyCredential;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory;
import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
-import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
@@ -92,7 +85,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends
AbstractProcessor {
REL_FAILURE
)));
- private BlobServiceClient storageClient;
+ private volatile BlobServiceClientFactory clientFactory;
@Override
public Set<Relationship> getRelationships() {
@@ -101,64 +94,23 @@ public abstract class AbstractAzureBlobProcessor_v12
extends AbstractProcessor {
@OnScheduled
public void onScheduled(ProcessContext context) {
- storageClient = createStorageClient(context);
+ clientFactory = new BlobServiceClientFactory(getLogger(),
getProxyOptions(context));
}
@OnStopped
public void onStopped() {
- storageClient = null;
+ clientFactory = null;
}
- protected BlobServiceClient getStorageClient() {
- return storageClient;
- }
+ protected BlobServiceClient getStorageClient(PropertyContext context,
FlowFile flowFile) {
+ final Map<String, String> attributes = flowFile != null ?
flowFile.getAttributes() : Collections.emptyMap();
- public static BlobServiceClient createStorageClient(PropertyContext
context) {
final AzureStorageCredentialsService_v12 credentialsService =
context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
- final AzureStorageCredentialsDetails_v12 credentialsDetails =
credentialsService.getCredentialsDetails();
-
- final BlobServiceClientBuilder clientBuilder = new
BlobServiceClientBuilder();
- clientBuilder.endpoint(String.format("https://%s.%s",
credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix()));
-
- final NettyAsyncHttpClientBuilder nettyClientBuilder = new
NettyAsyncHttpClientBuilder();
-
- nettyClientBuilder.proxy(getProxyOptions(context));
+ final AzureStorageCredentialsDetails_v12 credentialsDetails =
credentialsService.getCredentialsDetails(attributes);
- final HttpClient nettyClient = nettyClientBuilder.build();
- clientBuilder.httpClient(nettyClient);
+ final BlobServiceClient storageClient =
clientFactory.getStorageClient(credentialsDetails);
- configureCredential(clientBuilder, credentialsService,
credentialsDetails);
-
- return clientBuilder.buildClient();
- }
-
- private static void configureCredential(BlobServiceClientBuilder
clientBuilder, AzureStorageCredentialsService_v12 credentialsService,
AzureStorageCredentialsDetails_v12 credentialsDetails) {
- switch (credentialsDetails.getCredentialsType()) {
- case ACCOUNT_KEY:
- clientBuilder.credential(new
StorageSharedKeyCredential(credentialsDetails.getAccountName(),
credentialsDetails.getAccountKey()));
- break;
- case SAS_TOKEN:
- clientBuilder.credential(new
AzureSasCredential(credentialsDetails.getSasToken()));
- break;
- case MANAGED_IDENTITY:
- clientBuilder.credential(new ManagedIdentityCredentialBuilder()
-
.clientId(credentialsDetails.getManagedIdentityClientId())
- .build());
- break;
- case SERVICE_PRINCIPAL:
- clientBuilder.credential(new ClientSecretCredentialBuilder()
-
.tenantId(credentialsDetails.getServicePrincipalTenantId())
-
.clientId(credentialsDetails.getServicePrincipalClientId())
-
.clientSecret(credentialsDetails.getServicePrincipalClientSecret())
- .build());
- break;
- case ACCESS_TOKEN:
- TokenCredential credential = tokenRequestContext ->
Mono.just(credentialsService.getCredentialsDetails().getAccessToken());
- clientBuilder.credential(credential);
- break;
- default:
- throw new IllegalArgumentException("Unhandled credentials
type: " + credentialsDetails.getCredentialsType());
- }
+ return storageClient;
}
protected Map<String, String> createBlobAttributesMap(BlobClient
blobClient) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 41e677d0a8..b16176f3fd 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -95,7 +95,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor
extends AbstractProc
public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
- private DataLakeServiceClientFactory clientFactory;
+ private volatile DataLakeServiceClientFactory clientFactory;
@Override
public Set<Relationship> getRelationships() {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
index b49cbd0247..6f820588bd 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
@@ -101,7 +101,7 @@ public class DeleteAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 {
long startNanos = System.nanoTime();
try {
- BlobServiceClient storageClient = getStorageClient();
+ BlobServiceClient storageClient = getStorageClient(context,
flowFile);
BlobContainerClient containerClient =
storageClient.getBlobContainerClient(containerName);
BlobClient blobClient = containerClient.getBlobClient(blobName);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
index 775c9d7ab0..4d9db2d5b1 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
@@ -152,7 +152,7 @@ public class FetchAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 im
Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ?
context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()
: null);
try {
- BlobServiceClient storageClient = getStorageClient();
+ BlobServiceClient storageClient = getStorageClient(context,
flowFile);
BlobContainerClient containerClient =
storageClient.getBlobContainerClient(containerName);
final BlobClient blobClient;
if (isClientSideEncryptionEnabled(context)) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
index 9a36abf13f..0077f6f392 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
@@ -46,8 +46,11 @@ import
org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
+import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.serialization.record.RecordSchema;
+import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
+import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
import java.io.IOException;
import java.util.ArrayList;
@@ -59,7 +62,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE;
-import static
org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.createStorageClient;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@@ -148,7 +151,7 @@ public class ListAzureBlobStorage_v12 extends
AbstractListAzureProcessor<BlobInf
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
- private BlobServiceClient storageClient;
+ private volatile BlobServiceClientFactory clientFactory;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -157,12 +160,12 @@ public class ListAzureBlobStorage_v12 extends
AbstractListAzureProcessor<BlobInf
@OnScheduled
public void onScheduled(ProcessContext context) {
- storageClient = createStorageClient(context);
+ clientFactory = new BlobServiceClientFactory(getLogger(),
getProxyOptions(context));
}
@OnStopped
public void onStopped() {
- storageClient = null;
+ clientFactory = null;
}
@Override
@@ -215,6 +218,10 @@ public class ListAzureBlobStorage_v12 extends
AbstractListAzureProcessor<BlobInf
try {
final List<BlobInfo> listing = new ArrayList<>();
+ final AzureStorageCredentialsService_v12 credentialsService =
context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
+ final AzureStorageCredentialsDetails_v12 credentialsDetails =
credentialsService.getCredentialsDetails(Collections.emptyMap());
+ final BlobServiceClient storageClient =
clientFactory.getStorageClient(credentialsDetails);
+
final BlobContainerClient containerClient =
storageClient.getBlobContainerClient(containerName);
final ListBlobsOptions options = new ListBlobsOptions()
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index b568722e9d..5cfadc3430 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -175,7 +175,7 @@ public class ListAzureDataLakeStorage extends
AbstractListAzureProcessor<ADLSFil
private volatile Pattern filePattern;
private volatile Pattern pathPattern;
- private DataLakeServiceClientFactory clientFactory;
+ private volatile DataLakeServiceClientFactory clientFactory;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index a0aeb4e297..a7dfca488f 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -160,7 +160,7 @@ public class PutAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 impl
long startNanos = System.nanoTime();
try {
- BlobServiceClient storageClient = getStorageClient();
+ BlobServiceClient storageClient = getStorageClient(context,
flowFile);
BlobContainerClient containerClient =
storageClient.getBlobContainerClient(containerName);
if (createContainer && !containerClient.exists()) {
containerClient.create();
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AbstractStorageClientFactory.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AbstractStorageClientFactory.java
new file mode 100644
index 0000000000..0c9d9df111
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AbstractStorageClientFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.core.http.ProxyOptions;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.logging.ComponentLog;
+
+abstract class AbstractStorageClientFactory<CREDENTIAL, CLIENT> {
+
+ private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
+
+ private final ComponentLog logger;
+ private final ProxyOptions proxyOptions;
+
+ private final Cache<CREDENTIAL, CLIENT> clientCache;
+
+ protected AbstractStorageClientFactory(final ComponentLog logger, final
ProxyOptions proxyOptions) {
+ this.logger = logger;
+ this.proxyOptions = proxyOptions;
+ this.clientCache = createCache();
+ }
+
+ private Cache<CREDENTIAL, CLIENT> createCache() {
+ // Beware! By default, Caffeine does not perform cleanup and evict
values
+ // "automatically" or instantly after a value expires. Because of that
it
+ // can happen that there are more elements in the cache than the
maximum size.
+ // See: https://github.com/ben-manes/caffeine/wiki/Cleanup
+ return Caffeine.newBuilder()
+ .maximumSize(STORAGE_CLIENT_CACHE_SIZE)
+ .build();
+ }
+
+ /**
+ * Retrieves storage client object
+ *
+ * @param credentialsDetails used for caching because it can contain
properties that are results of an expression
+ * @return CLIENT
+ */
+ public CLIENT getStorageClient(final CREDENTIAL credentialsDetails) {
+ return clientCache.get(credentialsDetails, __ -> {
+ logger.debug(credentialsDetails.getClass().getSimpleName() + " is
not found in the cache with the given credentials. Creating it.");
+ return createStorageClient(credentialsDetails, proxyOptions);
+ });
+ }
+
+ protected abstract CLIENT createStorageClient(CREDENTIAL
credentialsDetails, ProxyOptions proxyOptions);
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
new file mode 100644
index 0000000000..b5a354b145
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.core.credential.AzureSasCredential;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.util.ClientOptions;
+import com.azure.core.util.HttpClientOptions;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.nifi.logging.ComponentLog;
+import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
+import reactor.core.publisher.Mono;
+
+public class BlobServiceClientFactory extends
AbstractStorageClientFactory<AzureStorageCredentialsDetails_v12,
BlobServiceClient> {
+
+ public BlobServiceClientFactory(final ComponentLog logger, final
ProxyOptions proxyOptions) {
+ super(logger, proxyOptions);
+ }
+
+ protected BlobServiceClient createStorageClient(final
AzureStorageCredentialsDetails_v12 credentialsDetails, final ProxyOptions
proxyOptions) {
+ final BlobServiceClientBuilder clientBuilder = new
BlobServiceClientBuilder();
+ clientBuilder.endpoint(String.format("https://%s.%s",
credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix()));
+
+ final ClientOptions clientOptions = new
HttpClientOptions().setProxyOptions(proxyOptions);
+ clientBuilder.clientOptions(clientOptions);
+
+ configureCredential(clientBuilder, credentialsDetails);
+
+ return clientBuilder.buildClient();
+ }
+
+ private void configureCredential(final BlobServiceClientBuilder
clientBuilder, final AzureStorageCredentialsDetails_v12 credentialsDetails) {
+ switch (credentialsDetails.getCredentialsType()) {
+ case ACCOUNT_KEY:
+ clientBuilder.credential(new
StorageSharedKeyCredential(credentialsDetails.getAccountName(),
credentialsDetails.getAccountKey()));
+ break;
+ case SAS_TOKEN:
+ clientBuilder.credential(new
AzureSasCredential(credentialsDetails.getSasToken()));
+ break;
+ case MANAGED_IDENTITY:
+ clientBuilder.credential(new ManagedIdentityCredentialBuilder()
+
.clientId(credentialsDetails.getManagedIdentityClientId())
+ .build());
+ break;
+ case SERVICE_PRINCIPAL:
+ clientBuilder.credential(new ClientSecretCredentialBuilder()
+
.tenantId(credentialsDetails.getServicePrincipalTenantId())
+
.clientId(credentialsDetails.getServicePrincipalClientId())
+
.clientSecret(credentialsDetails.getServicePrincipalClientSecret())
+ .build());
+ break;
+ case ACCESS_TOKEN:
+ TokenCredential credential = tokenRequestContext ->
Mono.just(credentialsDetails.getAccessToken());
+ clientBuilder.credential(credential);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled credentials
type: " + credentialsDetails.getCredentialsType());
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
index 672618beb8..51ee6ff9b3 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
@@ -18,9 +18,9 @@ package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
-import com.azure.core.http.HttpClient;
import com.azure.core.http.ProxyOptions;
-import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.core.util.ClientOptions;
+import com.azure.core.util.HttpClientOptions;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredential;
@@ -28,52 +28,18 @@ import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import reactor.core.publisher.Mono;
-public class DataLakeServiceClientFactory {
-
- private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
-
- private final ComponentLog logger;
- private final ProxyOptions proxyOptions;
-
- private final Cache<ADLSCredentialsDetails, DataLakeServiceClient>
clientCache;
+public class DataLakeServiceClientFactory extends
AbstractStorageClientFactory<ADLSCredentialsDetails, DataLakeServiceClient> {
public DataLakeServiceClientFactory(ComponentLog logger, ProxyOptions
proxyOptions) {
- this.logger = logger;
- this.proxyOptions = proxyOptions;
- this.clientCache = createCache();
- }
-
- private Cache<ADLSCredentialsDetails, DataLakeServiceClient> createCache()
{
- // Beware! By default, Caffeine does not perform cleanup and evict
values
- // "automatically" or instantly after a value expires. Because of that
it
- // can happen that there are more elements in the cache than the
maximum size.
- // See: https://github.com/ben-manes/caffeine/wiki/Cleanup
- return Caffeine.newBuilder()
- .maximumSize(STORAGE_CLIENT_CACHE_SIZE)
- .build();
- }
-
- /**
- * Retrieves a {@link DataLakeServiceClient}
- *
- * @param credentialsDetails used for caching because it can contain
properties that are results of an expression
- * @return DataLakeServiceClient
- */
- public DataLakeServiceClient getStorageClient(ADLSCredentialsDetails
credentialsDetails) {
- return clientCache.get(credentialsDetails, __ -> {
- logger.debug("DataLakeServiceClient is not found in the cache with
the given credentials. Creating it.");
- return createStorageClient(credentialsDetails, proxyOptions);
- });
+ super(logger, proxyOptions);
}
- private static DataLakeServiceClient
createStorageClient(ADLSCredentialsDetails credentialsDetails, ProxyOptions
proxyOptions) {
+ protected DataLakeServiceClient createStorageClient(ADLSCredentialsDetails
credentialsDetails, ProxyOptions proxyOptions) {
final String accountName = credentialsDetails.getAccountName();
final String accountKey = credentialsDetails.getAccountKey();
final String sasToken = credentialsDetails.getSasToken();
@@ -114,11 +80,8 @@ public class DataLakeServiceClientFactory {
throw new IllegalArgumentException("No valid credentials were
provided");
}
- final NettyAsyncHttpClientBuilder nettyClientBuilder = new
NettyAsyncHttpClientBuilder();
- nettyClientBuilder.proxy(proxyOptions);
-
- final HttpClient nettyClient = nettyClientBuilder.build();
- dataLakeServiceClientBuilder.httpClient(nettyClient);
+ final ClientOptions clientOptions = new
HttpClientOptions().setProxyOptions(proxyOptions);
+ dataLakeServiceClientBuilder.clientOptions(clientOptions);
return dataLakeServiceClientBuilder.buildClient();
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup_v12.java
new file mode 100644
index 0000000000..5b2e19fbde
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup_v12.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import
org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
+
+import java.util.Map;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue",
"credentials" })
+@CapabilityDescription("Provides an AzureStorageCredentialsService_v12 that
can be used to dynamically select another AzureStorageCredentialsService_v12. "
+
+ "This service requires an attribute named
'azure.storage.credentials.name' to be passed in, and will throw an exception
if the attribute is missing. " +
+ "The value of 'azure.storage.credentials.name' will be used to select
the AzureStorageCredentialsService_v12 that has been registered with that name.
" +
+ "This will allow multiple AzureStorageCredentialsServices_v12 to be
defined and registered, and then selected dynamically at runtime by tagging
flow files " +
+ "with the appropriate 'azure.storage.credentials.name' attribute.")
+@DynamicProperty(name = "The name to register
AzureStorageCredentialsService_v12", value = "The
AzureStorageCredentialsService_v12",
+ description = "If '" +
AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE
+ "' attribute contains " +
+ "the name of the dynamic property, then the
AzureStorageCredentialsService_v12 (registered in the value) will be selected.",
+ expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class AzureStorageCredentialsControllerServiceLookup_v12
+ extends
AbstractSingleAttributeBasedControllerServiceLookup<AzureStorageCredentialsService_v12>
implements AzureStorageCredentialsService_v12 {
+
+ public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE =
"azure.storage.credentials.name";
+
+ @Override
+ protected String getLookupAttribute() {
+ return AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE;
+ }
+
+ @Override
+ public Class<AzureStorageCredentialsService_v12> getServiceType() {
+ return AzureStorageCredentialsService_v12.class;
+ }
+
+ @Override
+ public AzureStorageCredentialsDetails_v12 getCredentialsDetails(final
Map<String, String> attributes) {
+ return lookupService(attributes).getCredentialsDetails(attributes);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
index ba8101d90d..696f86d27f 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java
@@ -29,6 +29,7 @@ import
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
/**
* Provides credentials details for Azure Blob processors
@@ -133,7 +134,7 @@ public class AzureStorageCredentialsControllerService_v12
extends AbstractContro
}
@Override
- public AzureStorageCredentialsDetails_v12 getCredentialsDetails() {
+ public AzureStorageCredentialsDetails_v12
getCredentialsDetails(Map<String, String> attributes) {
String accountName = context.getProperty(ACCOUNT_NAME).getValue();
String endpointSuffix =
context.getProperty(ENDPOINT_SUFFIX).getValue();
AzureStorageCredentialsType credentialsType =
AzureStorageCredentialsType.valueOf(context.getProperty(CREDENTIALS_TYPE).getValue());
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 0e5796ac58..42cd395867 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -20,4 +20,5 @@
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerServiceLookup
org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService
org.apache.nifi.services.azure.storage.AzureStorageEmulatorCredentialsControllerService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12
+org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12
org.apache.nifi.services.azure.StandardAzureCredentialsControllerService
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactoryTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactoryTest.java
new file mode 100644
index 0000000000..986788795e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.storage.blob.BlobServiceClient;
+import org.apache.nifi.logging.ComponentLog;
+import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+@ExtendWith(MockitoExtension.class)
+class BlobServiceClientFactoryTest {
+
+ @Mock
+ private ComponentLog logger;
+
+ @Test
+ void testThatServiceClientIsCachedByCredentials() {
+ final BlobServiceClientFactory clientFactory = new
BlobServiceClientFactory(logger, null);
+
+ final AzureStorageCredentialsDetails_v12 credentials =
createCredentialDetails("account");
+
+ final BlobServiceClient clientOne =
clientFactory.getStorageClient(credentials);
+ final BlobServiceClient clientTwo =
clientFactory.getStorageClient(credentials);
+
+ assertSame(clientOne, clientTwo);
+ }
+
+ @Test
+ void testThatDifferentServiceClientIsReturnedForDifferentCredentials() {
+ final BlobServiceClientFactory clientFactory = new
BlobServiceClientFactory(logger, null);
+
+ final AzureStorageCredentialsDetails_v12 credentialsOne =
createCredentialDetails("accountOne");
+ final AzureStorageCredentialsDetails_v12 credentialsTwo =
createCredentialDetails("accountTwo");
+
+ final BlobServiceClient clientOne =
clientFactory.getStorageClient(credentialsOne);
+ final BlobServiceClient clientTwo =
clientFactory.getStorageClient(credentialsTwo);
+
+ assertNotSame(clientOne, clientTwo);
+ }
+
+ @Test
+ void testThatCachedClientIsReturnedAfterDifferentClientIsCreated() {
+ final BlobServiceClientFactory clientFactory = new
BlobServiceClientFactory(logger, null);
+
+ final AzureStorageCredentialsDetails_v12 credentialsOne =
createCredentialDetails("accountOne");
+ final AzureStorageCredentialsDetails_v12 credentialsTwo =
createCredentialDetails("accountTwo");
+ final AzureStorageCredentialsDetails_v12 credentialsThree =
createCredentialDetails("accountOne");
+
+ final BlobServiceClient clientOne =
clientFactory.getStorageClient(credentialsOne);
+ final BlobServiceClient clientTwo =
clientFactory.getStorageClient(credentialsTwo);
+ final BlobServiceClient clientThree =
clientFactory.getStorageClient(credentialsThree);
+
+ assertNotSame(clientOne, clientTwo);
+ assertSame(clientOne, clientThree);
+ }
+
+ private AzureStorageCredentialsDetails_v12 createCredentialDetails(String
accountName) {
+ return
AzureStorageCredentialsDetails_v12.createWithAccountKey(accountName,
"dfs.core.windows.net", "accountKey");
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup_v12.java
new file mode 100644
index 0000000000..310b11acda
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerServiceLookup_v12.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestAzureStorageCredentialsControllerServiceLookup_v12 {
+
+ private MockAzureStorageCredentialsService serviceA;
+ private MockAzureStorageCredentialsService serviceB;
+
+ private AzureStorageCredentialsControllerServiceLookup_v12 lookupService;
+ private TestRunner runner;
+
+ @BeforeEach
+ public void setup() throws InitializationException {
+ serviceA = new
MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12.createWithAccountKey("Account_Name_A",
"core.windows.net", "Account_Key"));
+ serviceB = new
MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12.createWithSasToken("Account_Name_B",
"core.windows.net", "SAS_Token"));
+
+ lookupService = new
AzureStorageCredentialsControllerServiceLookup_v12();
+
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+ final String serviceAIdentifier = "service-a";
+ runner.addControllerService(serviceAIdentifier, serviceA);
+
+ final String serviceBIdentifier = "service-b";
+ runner.addControllerService(serviceBIdentifier, serviceB);
+
+ runner.addControllerService("lookup-service", lookupService);
+ runner.setProperty(lookupService, "a", serviceAIdentifier);
+ runner.setProperty(lookupService, "b", serviceBIdentifier);
+
+ runner.enableControllerService(serviceA);
+ runner.enableControllerService(serviceB);
+ runner.enableControllerService(lookupService);
+ }
+
+ @Test
+ public void testLookupServiceA() {
+ final Map<String,String> attributes = new HashMap<>();
+
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE,
"a");
+
+ final AzureStorageCredentialsDetails_v12 storageCredentialsDetails =
lookupService.getCredentialsDetails(attributes);
+ assertNotNull(storageCredentialsDetails);
+ assertEquals("Account_Name_A",
storageCredentialsDetails.getAccountName());
+ assertEquals("core.windows.net",
storageCredentialsDetails.getEndpointSuffix());
+ assertEquals("Account_Key", storageCredentialsDetails.getAccountKey());
+ assertNull(storageCredentialsDetails.getSasToken());
+ }
+
+ @Test
+ public void testLookupServiceB() {
+ final Map<String, String> attributes = new HashMap<>();
+
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE,
"b");
+
+ final AzureStorageCredentialsDetails_v12 storageCredentialsDetails =
lookupService.getCredentialsDetails(attributes);
+ assertNotNull(storageCredentialsDetails);
+ assertEquals("Account_Name_B",
storageCredentialsDetails.getAccountName());
+ assertEquals("core.windows.net",
storageCredentialsDetails.getEndpointSuffix());
+ assertEquals("SAS_Token", storageCredentialsDetails.getSasToken());
+ assertNull(storageCredentialsDetails.getAccountKey());
+ }
+
+ @Test
+ public void testLookupMissingCredentialsNameAttribute() {
+ final Map<String, String> attributes = new HashMap<>();
+ assertThrows(ProcessException.class, () ->
lookupService.getCredentialsDetails(attributes));
+ }
+
+ @Test
+ public void testLookupWithCredentialsNameThatDoesNotExist() {
+ final Map<String, String> attributes = new HashMap<>();
+
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE,
"DOES-NOT-EXIST");
+ assertThrows(ProcessException.class, () ->
lookupService.getCredentialsDetails(attributes));
+ }
+
+ @Test
+ public void testCustomValidateAtLeaseOneServiceDefined() throws
InitializationException {
+ // enable lookup service with no services registered, verify not valid
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService("lookup-service", lookupService);
+ runner.assertNotValid(lookupService);
+
+ final String serviceAIdentifier = "service-a";
+ runner.addControllerService(serviceAIdentifier, serviceA);
+
+ // register a service and now verify valid
+ runner.setProperty(lookupService, "a", serviceAIdentifier);
+ runner.enableControllerService(lookupService);
+ runner.assertValid(lookupService);
+ }
+
+ @Test
+ public void testCustomValidateSelfReferenceNotAllowed() throws
InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService("lookup-service", lookupService);
+ runner.setProperty(lookupService, "lookup-service", "lookup-service");
+ runner.assertNotValid(lookupService);
+ }
+
+ /**
+ * A mock AzureStorageCredentialsService_v12 that will always return the
passed in AzureStorageCredentialsDetails_v12.
+ */
+ private static class MockAzureStorageCredentialsService extends
AbstractControllerService implements AzureStorageCredentialsService_v12 {
+
+ private final AzureStorageCredentialsDetails_v12
storageCredentialsDetails;
+
+ MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12
storageCredentialsDetails) {
+ this.storageCredentialsDetails = storageCredentialsDetails;
+ }
+
+ @Override
+ public AzureStorageCredentialsDetails_v12
getCredentialsDetails(Map<String, String> attributes) {
+ return storageCredentialsDetails;
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
index 6bda55e609..d3de995a98 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java
@@ -23,6 +23,8 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+
import static
org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX;
import static
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12.ACCOUNT_NAME;
import static
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12.CREDENTIALS_TYPE;
@@ -158,7 +160,7 @@ public class
TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
- AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails();
+ AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@@ -178,7 +180,7 @@ public class
TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
- AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails();
+ AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@@ -197,7 +199,7 @@ public class
TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
- AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails();
+ AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@@ -219,7 +221,7 @@ public class
TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
- AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails();
+ AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@@ -240,7 +242,7 @@ public class
TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
- AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails();
+ AzureStorageCredentialsDetails_v12 actual =
credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ENDPOINT_SUFFIX_VALUE, actual.getEndpointSuffix());
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
index 6ab154552a..68457256d6 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java
@@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
import com.azure.core.credential.AccessToken;
+import java.util.Objects;
+
public class AzureStorageCredentialsDetails_v12 {
private final String accountName;
@@ -86,6 +88,45 @@ public class AzureStorageCredentialsDetails_v12 {
return accessToken;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AzureStorageCredentialsDetails_v12 that =
(AzureStorageCredentialsDetails_v12) o;
+ return credentialsType == that.credentialsType
+ && Objects.equals(accountName, that.accountName)
+ && Objects.equals(endpointSuffix, that.endpointSuffix)
+ && Objects.equals(accountKey, that.accountKey)
+ && Objects.equals(sasToken, that.sasToken)
+ && Objects.equals(managedIdentityClientId,
that.managedIdentityClientId)
+ && Objects.equals(servicePrincipalTenantId,
that.servicePrincipalTenantId)
+ && Objects.equals(servicePrincipalClientId,
that.servicePrincipalClientId)
+ && Objects.equals(servicePrincipalClientSecret,
that.servicePrincipalClientSecret)
+ && Objects.equals(accessToken, that.accessToken);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ credentialsType,
+ accountName,
+ endpointSuffix,
+ accountKey,
+ sasToken,
+ managedIdentityClientId,
+ servicePrincipalTenantId,
+ servicePrincipalClientId,
+ servicePrincipalClientSecret,
+ accessToken
+ );
+ }
+
public static AzureStorageCredentialsDetails_v12 createWithAccountKey(
String accountName,
String endpointSuffix,
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService_v12.java
index 06422309c8..27f2630388 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsService_v12.java
@@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
import org.apache.nifi.controller.ControllerService;
+import java.util.Map;
+
/**
* Service interface to provide Azure credentials details for processors using
Azure Storage Java v12 client library.
*/
@@ -25,7 +27,8 @@ public interface AzureStorageCredentialsService_v12 extends
ControllerService {
/**
* Get AzureStorageCredentialsDetails_v12 object which contains the
Storage Account Name, the Storage Service Endpoint Suffix and the parameters of
the Storage Credentials
+ * @param attributes FlowFile attributes (typically)
* @return AzureStorageCredentialsDetails_v12 object
*/
- AzureStorageCredentialsDetails_v12 getCredentialsDetails();
+ AzureStorageCredentialsDetails_v12 getCredentialsDetails(Map<String,
String> attributes);
}