This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 320aed024f NIFI-10152 Storage client caching in Azure ADLS processors
320aed024f is described below
commit 320aed024f62ec87ccdd86e6f71e10d75eebd344
Author: Nandor Soma Abonyi <[email protected]>
AuthorDate: Thu Jun 23 01:25:45 2022 +0200
NIFI-10152 Storage client caching in Azure ADLS processors
This closes #6158.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../src/main/resources/META-INF/NOTICE | 5 +
.../nifi-azure-processors/pom.xml | 7 +-
.../AbstractAzureDataLakeStorageProcessor.java | 80 ++++---------
.../azure/storage/ListAzureDataLakeStorage.java | 14 ++-
.../azure/storage/utils/AzureStorageUtils.java | 4 +-
.../utils/DataLakeServiceClientFactory.java | 125 +++++++++++++++++++++
.../azure/storage/ITListAzureDataLakeStorage.java | 32 +++---
.../utils/DataLakeServiceClientFactoryTest.java | 84 ++++++++++++++
.../azure/storage/ADLSCredentialsDetails.java | 41 +++++++
9 files changed, 310 insertions(+), 82 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
index 5faf77b19f..a4fb770588 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
@@ -195,6 +195,11 @@ The following binary components are provided under the
Apache Software License v
Reactive Streams Netty Driver
Copyright 2020, Project Reactor
+ (ASLv2) Caffeine (com.github.ben-manes.caffeine:caffeine:jar:2.9.2 -
https://github.com/ben-manes/caffeine)
+ The following NOTICE information applies:
+ Caffeine (caching library)
+ Copyright Ben Manes
+
************************
Common Development and Distribution License 1.0
************************
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 667a7923ed..a6ad604524 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -135,6 +135,12 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>2.9.2</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@@ -159,7 +165,6 @@
<version>1.18.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 1a66dda500..41e677d0a8 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -16,18 +16,10 @@
*/
package org.apache.nifi.processors.azure;
-import com.azure.core.credential.AccessToken;
-import com.azure.core.credential.TokenCredential;
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
-import com.azure.identity.ClientSecretCredential;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.identity.ManagedIdentityCredential;
-import com.azure.identity.ManagedIdentityCredentialBuilder;
-import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -40,9 +32,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import
org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
-import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
@@ -51,7 +44,6 @@ import java.util.Map;
import java.util.Set;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
-import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
public abstract class AbstractAzureDataLakeStorageProcessor extends
AbstractProcessor {
@@ -65,7 +57,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor
extends AbstractProc
public static final PropertyDescriptor FILESYSTEM = new
PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
- .description("Name of the Azure Storage File System. It is assumed
to be already existing.")
+ .description("Name of the Azure Storage File System (also called
Container). It is assumed to be already existing.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
@@ -103,65 +95,31 @@ public abstract class
AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
+ private DataLakeServiceClientFactory clientFactory;
+
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
- public static DataLakeServiceClient getStorageClient(PropertyContext
context, FlowFile flowFile) {
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ clientFactory = new DataLakeServiceClientFactory(getLogger(),
AzureStorageUtils.getProxyOptions(context));
+ }
+
+ @OnStopped
+ public void onStopped() {
+ clientFactory = null;
+ }
+
+ public DataLakeServiceClient getStorageClient(PropertyContext context,
FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ?
flowFile.getAttributes() : Collections.emptyMap();
final ADLSCredentialsService credentialsService =
context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
final ADLSCredentialsDetails credentialsDetails =
credentialsService.getCredentialsDetails(attributes);
- final String accountName = credentialsDetails.getAccountName();
- final String accountKey = credentialsDetails.getAccountKey();
- final String sasToken = credentialsDetails.getSasToken();
- final AccessToken accessToken = credentialsDetails.getAccessToken();
- final String endpointSuffix = credentialsDetails.getEndpointSuffix();
- final boolean useManagedIdentity =
credentialsDetails.getUseManagedIdentity();
- final String managedIdentityClientId =
credentialsDetails.getManagedIdentityClientId();
- final String servicePrincipalTenantId =
credentialsDetails.getServicePrincipalTenantId();
- final String servicePrincipalClientId =
credentialsDetails.getServicePrincipalClientId();
- final String servicePrincipalClientSecret =
credentialsDetails.getServicePrincipalClientSecret();
-
- final String endpoint = String.format("https://%s.%s", accountName,
endpointSuffix);
-
- final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new
DataLakeServiceClientBuilder();
- dataLakeServiceClientBuilder.endpoint(endpoint);
-
- if (StringUtils.isNotBlank(accountKey)) {
- final StorageSharedKeyCredential credential = new
StorageSharedKeyCredential(accountName, accountKey);
- dataLakeServiceClientBuilder.credential(credential);
- } else if (StringUtils.isNotBlank(sasToken)) {
- dataLakeServiceClientBuilder.sasToken(sasToken);
- } else if (accessToken != null) {
- final TokenCredential credential = tokenRequestContext ->
Mono.just(accessToken);
- dataLakeServiceClientBuilder.credential(credential);
- } else if (useManagedIdentity) {
- final ManagedIdentityCredential misCredential = new
ManagedIdentityCredentialBuilder()
- .clientId(managedIdentityClientId)
- .build();
- dataLakeServiceClientBuilder.credential(misCredential);
- } else if (StringUtils.isNoneBlank(servicePrincipalTenantId,
servicePrincipalClientId, servicePrincipalClientSecret)) {
- final ClientSecretCredential credential = new
ClientSecretCredentialBuilder()
- .tenantId(servicePrincipalTenantId)
- .clientId(servicePrincipalClientId)
- .clientSecret(servicePrincipalClientSecret)
- .build();
- dataLakeServiceClientBuilder.credential(credential);
- } else {
- throw new IllegalArgumentException("No valid credentials were
provided");
- }
-
- final NettyAsyncHttpClientBuilder nettyClientBuilder = new
NettyAsyncHttpClientBuilder();
- nettyClientBuilder.proxy(getProxyOptions(context));
-
- final HttpClient nettyClient = nettyClientBuilder.build();
- dataLakeServiceClientBuilder.httpClient(nettyClient);
-
- final DataLakeServiceClient storageClient =
dataLakeServiceClientBuilder.buildClient();
+ final DataLakeServiceClient storageClient =
clientFactory.getStorageClient(credentialsDetails);
return storageClient;
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index 24f51b5ae2..601a7e285c 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -43,7 +43,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import
org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import java.io.IOException;
import java.util.Arrays;
@@ -66,7 +69,6 @@ import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProce
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY;
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
@@ -170,6 +172,8 @@ public class ListAzureDataLakeStorage extends
AbstractListAzureProcessor<ADLSFil
private volatile Pattern filePattern;
private volatile Pattern pathPattern;
+ private DataLakeServiceClientFactory clientFactory;
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
@@ -179,12 +183,14 @@ public class ListAzureDataLakeStorage extends
AbstractListAzureProcessor<ADLSFil
public void onScheduled(final ProcessContext context) {
filePattern = getPattern(context, FILE_FILTER);
pathPattern = getPattern(context, PATH_FILTER);
+ clientFactory = new DataLakeServiceClientFactory(getLogger(),
AzureStorageUtils.getProxyOptions(context));
}
@OnStopped
public void onStopped() {
filePattern = null;
pathPattern = null;
+ clientFactory = null;
}
@Override
@@ -264,7 +270,11 @@ public class ListAzureDataLakeStorage extends
AbstractListAzureProcessor<ADLSFil
final Pattern filePattern = listingMode == ListingMode.EXECUTION ?
this.filePattern : getPattern(context, FILE_FILTER);
final Pattern pathPattern = listingMode == ListingMode.EXECUTION ?
this.pathPattern : getPattern(context, PATH_FILTER);
- final DataLakeServiceClient storageClient =
getStorageClient(context, null);
+ final ADLSCredentialsService credentialsService =
context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
+
+ final ADLSCredentialsDetails credentialsDetails =
credentialsService.getCredentialsDetails(Collections.emptyMap());
+
+ final DataLakeServiceClient storageClient =
clientFactory.getStorageClient(credentialsDetails);
final DataLakeFileSystemClient fileSystemClient =
storageClient.getFileSystemClient(fileSystem);
final ListPathsOptions options = new ListPathsOptions();
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 654db1469f..5f17fbd43e 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -321,7 +321,7 @@ public final class AzureStorageUtils {
*
* Creates the {@link ProxyOptions proxy options} that {@link HttpClient}
will use.
*
- * @param propertyContext is sed to supply Proxy configurations
+ * @param propertyContext to supply Proxy configurations
* @return {@link ProxyOptions proxy options}, null if Proxy is not set
*/
public static ProxyOptions getProxyOptions(final PropertyContext
propertyContext) {
@@ -342,7 +342,7 @@ public final class AzureStorageUtils {
return proxyOptions;
}
- return null;
+ return null;
}
private static ProxyOptions.Type getProxyType(ProxyConfiguration
proxyConfiguration) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
new file mode 100644
index 0000000000..672618beb8
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.identity.ClientSecretCredential;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.ManagedIdentityCredential;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
+import reactor.core.publisher.Mono;
+
+public class DataLakeServiceClientFactory {
+
+ private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
+
+ private final ComponentLog logger;
+ private final ProxyOptions proxyOptions;
+
+ private final Cache<ADLSCredentialsDetails, DataLakeServiceClient>
clientCache;
+
+ public DataLakeServiceClientFactory(ComponentLog logger, ProxyOptions
proxyOptions) {
+ this.logger = logger;
+ this.proxyOptions = proxyOptions;
+ this.clientCache = createCache();
+ }
+
+ private Cache<ADLSCredentialsDetails, DataLakeServiceClient> createCache()
{
+ // Beware! By default, Caffeine does not perform cleanup and evict
values
+ // "automatically" or instantly after a value expires. Because of that
it
+ // can happen that there are more elements in the cache than the
maximum size.
+ // See: https://github.com/ben-manes/caffeine/wiki/Cleanup
+ return Caffeine.newBuilder()
+ .maximumSize(STORAGE_CLIENT_CACHE_SIZE)
+ .build();
+ }
+
+ /**
+ * Retrieves a {@link DataLakeServiceClient}
+ *
+ * @param credentialsDetails used for caching because it can contain
properties that are results of an expression
+ * @return DataLakeServiceClient
+ */
+ public DataLakeServiceClient getStorageClient(ADLSCredentialsDetails
credentialsDetails) {
+ return clientCache.get(credentialsDetails, __ -> {
+ logger.debug("DataLakeServiceClient is not found in the cache with
the given credentials. Creating it.");
+ return createStorageClient(credentialsDetails, proxyOptions);
+ });
+ }
+
+ private static DataLakeServiceClient
createStorageClient(ADLSCredentialsDetails credentialsDetails, ProxyOptions
proxyOptions) {
+ final String accountName = credentialsDetails.getAccountName();
+ final String accountKey = credentialsDetails.getAccountKey();
+ final String sasToken = credentialsDetails.getSasToken();
+ final AccessToken accessToken = credentialsDetails.getAccessToken();
+ final String endpointSuffix = credentialsDetails.getEndpointSuffix();
+ final boolean useManagedIdentity =
credentialsDetails.getUseManagedIdentity();
+ final String managedIdentityClientId =
credentialsDetails.getManagedIdentityClientId();
+ final String servicePrincipalTenantId =
credentialsDetails.getServicePrincipalTenantId();
+ final String servicePrincipalClientId =
credentialsDetails.getServicePrincipalClientId();
+ final String servicePrincipalClientSecret =
credentialsDetails.getServicePrincipalClientSecret();
+
+ final String endpoint = String.format("https://%s.%s", accountName,
endpointSuffix);
+
+ final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new
DataLakeServiceClientBuilder();
+ dataLakeServiceClientBuilder.endpoint(endpoint);
+
+ if (StringUtils.isNotBlank(accountKey)) {
+ final StorageSharedKeyCredential credential = new
StorageSharedKeyCredential(accountName, accountKey);
+ dataLakeServiceClientBuilder.credential(credential);
+ } else if (StringUtils.isNotBlank(sasToken)) {
+ dataLakeServiceClientBuilder.sasToken(sasToken);
+ } else if (accessToken != null) {
+ final TokenCredential credential = tokenRequestContext ->
Mono.just(accessToken);
+ dataLakeServiceClientBuilder.credential(credential);
+ } else if (useManagedIdentity) {
+ final ManagedIdentityCredential misCredential = new
ManagedIdentityCredentialBuilder()
+ .clientId(managedIdentityClientId)
+ .build();
+ dataLakeServiceClientBuilder.credential(misCredential);
+ } else if (StringUtils.isNoneBlank(servicePrincipalTenantId,
servicePrincipalClientId, servicePrincipalClientSecret)) {
+ final ClientSecretCredential credential = new
ClientSecretCredentialBuilder()
+ .tenantId(servicePrincipalTenantId)
+ .clientId(servicePrincipalClientId)
+ .clientSecret(servicePrincipalClientSecret)
+ .build();
+ dataLakeServiceClientBuilder.credential(credential);
+ } else {
+ throw new IllegalArgumentException("No valid credentials were
provided");
+ }
+
+ final NettyAsyncHttpClientBuilder nettyClientBuilder = new
NettyAsyncHttpClientBuilder();
+ nettyClientBuilder.proxy(proxyOptions);
+
+ final HttpClient nettyClient = nettyClientBuilder.build();
+ dataLakeServiceClientBuilder.httpClient(nettyClient);
+
+ return dataLakeServiceClientBuilder.buildClient();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
index 978fe9433f..26ee97ee65 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
@@ -98,7 +98,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListRootRecursive() throws Exception {
+ public void testListRootRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runProcessor();
@@ -131,7 +131,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListRootNonRecursive() throws Exception {
+ public void testListRootNonRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES,
"false");
@@ -152,7 +152,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListSubdirectoryRecursive() throws Exception {
+ public void testListSubdirectoryRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
runProcessor();
@@ -173,7 +173,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListSubdirectoryNonRecursive() throws Exception {
+ public void testListSubdirectoryNonRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES,
"false");
@@ -194,7 +194,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListWithFileFilter() throws Exception {
+ public void testListWithFileFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
@@ -218,7 +218,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListWithFileFilterWithEL() throws Exception {
+ public void testListWithFileFilterWithEL() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER,
".*file${suffix}$");
runner.setVariable("suffix", "1.*");
@@ -244,7 +244,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListRootWithPathFilter() throws Exception {
+ public void testListRootWithPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
@@ -267,7 +267,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListRootWithPathFilterWithEL() throws Exception {
+ public void testListRootWithPathFilterWithEL() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER,
"${prefix}${suffix}");
runner.setVariable("prefix", "^dir");
@@ -294,7 +294,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListSubdirectoryWithPathFilter() throws Exception {
+ public void testListSubdirectoryWithPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
@@ -315,7 +315,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListRootWithFileAndPathFilter() throws Exception {
+ public void testListRootWithFileAndPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
@@ -339,7 +339,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListEmptyDirectory() throws Exception {
+ public void testListEmptyDirectory() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir3");
runProcessor();
@@ -401,7 +401,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListWithMinAge() throws Exception {
+ public void testListWithMinAge() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
@@ -422,7 +422,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListWithMaxAge() throws Exception {
+ public void testListWithMaxAge() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
@@ -447,7 +447,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListWithMinSize() throws Exception {
+ public void testListWithMinSize() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
@@ -471,7 +471,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
@Test
- public void testListWithMaxSize() throws Exception {
+ public void testListWithMaxSize() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
@@ -496,7 +496,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
runner.run();
}
- private void assertSuccess(String... testFilePaths) throws Exception {
+ private void assertSuccess(String... testFilePaths) {
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS,
testFilePaths.length);
Map<String, TestFile> expectedFiles = new HashMap<>(testFiles);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java
new file mode 100644
index 0000000000..8576553d28
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+@ExtendWith(MockitoExtension.class)
+class DataLakeServiceClientFactoryTest {
+
+ @Mock
+ private ComponentLog logger;
+
+ @Test
+ void testThatServiceClientIsCachedByCredentials() {
+ final DataLakeServiceClientFactory clientFactory = new
DataLakeServiceClientFactory(logger, null);
+
+ final ADLSCredentialsDetails credentials =
createCredentialDetails("account");
+
+ final DataLakeServiceClient clientOne =
clientFactory.getStorageClient(credentials);
+ final DataLakeServiceClient clientTwo =
clientFactory.getStorageClient(credentials);
+
+ assertSame(clientOne, clientTwo);
+ }
+
+ @Test
+ void testThatDifferentServiceClientIsReturnedForDifferentCredentials() {
+ final DataLakeServiceClientFactory clientFactory = new
DataLakeServiceClientFactory(logger, null);
+
+ final ADLSCredentialsDetails credentialsOne =
createCredentialDetails("accountOne");
+ final ADLSCredentialsDetails credentialsTwo =
createCredentialDetails("accountTwo");
+
+ final DataLakeServiceClient clientOne =
clientFactory.getStorageClient(credentialsOne);
+ final DataLakeServiceClient clientTwo =
clientFactory.getStorageClient(credentialsTwo);
+
+ assertNotSame(clientOne, clientTwo);
+ }
+
+ @Test
+ void testThatCachedClientIsReturnedAfterDifferentClientIsCreated() {
+ final DataLakeServiceClientFactory clientFactory = new
DataLakeServiceClientFactory(logger, null);
+
+ final ADLSCredentialsDetails credentialsOne =
createCredentialDetails("accountOne");
+ final ADLSCredentialsDetails credentialsTwo =
createCredentialDetails("accountTwo");
+ final ADLSCredentialsDetails credentialsThree =
createCredentialDetails("accountOne");
+
+ final DataLakeServiceClient clientOne =
clientFactory.getStorageClient(credentialsOne);
+ final DataLakeServiceClient clientTwo =
clientFactory.getStorageClient(credentialsTwo);
+ final DataLakeServiceClient clientThree =
clientFactory.getStorageClient(credentialsThree);
+
+ assertNotSame(clientOne, clientTwo);
+ assertSame(clientOne, clientThree);
+ }
+
+ private ADLSCredentialsDetails createCredentialDetails(String accountName)
{
+ return ADLSCredentialsDetails.Builder.newBuilder()
+ .setAccountName(accountName)
+ .setAccountKey("accountKey")
+ .setEndpointSuffix("dfs.core.windows.net")
+ .build();
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
index eb3b1237c0..0a831161e5 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
@@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
import com.azure.core.credential.AccessToken;
+import java.util.Objects;
+
public class ADLSCredentialsDetails {
private final String accountName;
@@ -98,6 +100,45 @@ public class ADLSCredentialsDetails {
return servicePrincipalClientSecret;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ADLSCredentialsDetails that = (ADLSCredentialsDetails) o;
+ return useManagedIdentity == that.useManagedIdentity
+ && Objects.equals(accountName, that.accountName)
+ && Objects.equals(accountKey, that.accountKey)
+ && Objects.equals(sasToken, that.sasToken)
+ && Objects.equals(endpointSuffix, that.endpointSuffix)
+ && Objects.equals(accessToken, that.accessToken)
+ && Objects.equals(managedIdentityClientId,
that.managedIdentityClientId)
+ && Objects.equals(servicePrincipalTenantId,
that.servicePrincipalTenantId)
+ && Objects.equals(servicePrincipalClientId,
that.servicePrincipalClientId)
+ && Objects.equals(servicePrincipalClientSecret,
that.servicePrincipalClientSecret);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ accountName,
+ accountKey,
+ sasToken,
+ endpointSuffix,
+ accessToken,
+ useManagedIdentity,
+ managedIdentityClientId,
+ servicePrincipalTenantId,
+ servicePrincipalClientId,
+ servicePrincipalClientSecret
+ );
+ }
+
public static class Builder {
private String accountName;
private String accountKey;