This is an automated email from the ASF dual-hosted git repository.
mbathori pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 03bba7049a NIFI-12672 Added AzureFileResourceService
03bba7049a is described below
commit 03bba7049abcb31e3551e7f9353fa9f5436d9e66
Author: Balázs Gerner <[email protected]>
AuthorDate: Thu Feb 1 15:51:00 2024 +0100
NIFI-12672 Added AzureFileResourceService
This closes #8359.
Signed-off-by: Mark Bathori <[email protected]>
---
.../azure/AbstractAzureBlobProcessor_v12.java | 11 +-
.../AbstractAzureDataLakeStorageProcessor.java | 117 +---------
.../azure/storage/CopyAzureBlobStorage_v12.java | 3 +-
.../azure/storage/DeleteAzureBlobStorage_v12.java | 3 +-
.../azure/storage/DeleteAzureDataLakeStorage.java | 22 +-
.../azure/storage/FetchAzureBlobStorage_v12.java | 3 +-
.../azure/storage/FetchAzureDataLakeStorage.java | 14 +-
.../azure/storage/ListAzureBlobStorage_v12.java | 8 +-
.../azure/storage/ListAzureDataLakeStorage.java | 14 +-
.../azure/storage/MoveAzureDataLakeStorage.java | 18 +-
.../azure/storage/PutAzureBlobStorage_v12.java | 3 +-
.../azure/storage/PutAzureDataLakeStorage.java | 16 +-
.../azure/storage/utils/AzureStorageUtils.java | 135 +++++++++++
.../AzureBlobStorageFileResourceService.java | 140 ++++++++++++
.../AzureDataLakeStorageFileResourceService.java | 150 ++++++++++++
.../org.apache.nifi.controller.ControllerService | 2 +
.../storage/AbstractAzureBlobStorage_v12IT.java | 3 +-
.../storage/AbstractAzureDataLakeStorageIT.java | 5 +-
.../storage/ITDeleteAzureDataLakeStorage.java | 5 +-
.../azure/storage/ITFetchAzureDataLakeStorage.java | 7 +-
.../azure/storage/ITListAzureDataLakeStorage.java | 71 +++---
.../azure/storage/ITMoveAzureDataLakeStorage.java | 9 +-
.../azure/storage/ITPutAzureDataLakeStorage.java | 23 +-
.../storage/TestAbstractAzureDataLakeStorage.java | 8 +-
.../AzureBlobStorageFileResourceServiceTest.java | 191 ++++++++++++++++
...zureDataLakeStorageFileResourceServiceTest.java | 251 +++++++++++++++++++++
26 files changed, 1008 insertions(+), 224 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
index 89fee5701c..c72e41fe62 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE;
@@ -52,14 +53,6 @@ import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor
{
- public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new
PropertyDescriptor.Builder()
- .name("storage-credentials-service")
- .displayName("Storage Credentials")
- .description("Controller Service used to obtain Azure Blob Storage
Credentials.")
-
.identifiesControllerService(AzureStorageCredentialsService_v12.class)
- .required(true)
- .build();
-
public static final PropertyDescriptor BLOB_NAME = new
PropertyDescriptor.Builder()
.name("blob-name")
.displayName("Blob Name")
@@ -98,7 +91,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends
AbstractProcessor {
}
protected BlobServiceClient getStorageClient(PropertyContext context,
FlowFile flowFile) {
- return getStorageClient(context, STORAGE_CREDENTIALS_SERVICE,
flowFile);
+ return getStorageClient(context, BLOB_STORAGE_CREDENTIALS_SERVICE,
flowFile);
}
protected BlobServiceClient getStorageClient(PropertyContext context,
PropertyDescriptor storageCredentialsServiceProperty, FlowFile flowFile) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index bb62bd5400..fd357b49a7 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -17,69 +17,25 @@
package org.apache.nifi.processors.azure;
import com.azure.storage.file.datalake.DataLakeServiceClient;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import
org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
-import java.util.Collections;
import java.util.Map;
import java.util.Set;
-import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
public abstract class AbstractAzureDataLakeStorageProcessor extends
AbstractProcessor {
- public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new
PropertyDescriptor.Builder()
- .name("adls-credentials-service")
- .displayName("ADLS Credentials")
- .description("Controller Service used to obtain Azure
Credentials.")
- .identifiesControllerService(ADLSCredentialsService.class)
- .required(true)
- .build();
-
- public static final PropertyDescriptor FILESYSTEM = new
PropertyDescriptor.Builder()
- .name("filesystem-name").displayName("Filesystem Name")
- .description("Name of the Azure Storage File System (also called
Container). It is assumed to be already existing.")
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
- public static final PropertyDescriptor DIRECTORY = new
PropertyDescriptor.Builder()
- .name("directory-name")
- .displayName("Directory Name")
- .description("Name of the Azure Storage Directory. The Directory
Name cannot contain a leading '/'. The root directory can be designated by the
empty string value. " +
- "In case of the PutAzureDataLakeStorage processor, the
directory will be created if not already existing.")
- .addValidator(new DirectoryValidator())
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
- public static final PropertyDescriptor FILE = new
PropertyDescriptor.Builder()
- .name("file-name").displayName("File Name")
- .description("The filename")
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
- .build();
-
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that have been successfully written to Azure
storage are transferred to this relationship")
@@ -111,77 +67,12 @@ public abstract class
AbstractAzureDataLakeStorageProcessor extends AbstractProc
}
public DataLakeServiceClient getStorageClient(PropertyContext context,
FlowFile flowFile) {
- final Map<String, String> attributes = flowFile != null ?
flowFile.getAttributes() : Collections.emptyMap();
+ final Map<String, String> attributes = flowFile != null ?
flowFile.getAttributes() : Map.of();
- final ADLSCredentialsService credentialsService =
context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
+ final ADLSCredentialsService credentialsService =
context.getProperty(ADLS_CREDENTIALS_SERVICE)
+ .asControllerService(ADLSCredentialsService.class);
final ADLSCredentialsDetails credentialsDetails =
credentialsService.getCredentialsDetails(attributes);
return clientFactory.getStorageClient(credentialsDetails);
}
-
- public static String evaluateFileSystemProperty(ProcessContext context,
FlowFile flowFile) {
- return evaluateFileSystemProperty(context, flowFile, FILESYSTEM);
- }
-
- public static String evaluateFileSystemProperty(ProcessContext context,
FlowFile flowFile, PropertyDescriptor property) {
- String fileSystem =
context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isBlank(fileSystem)) {
- throw new ProcessException(String.format("'%1$s' property
evaluated to blank string. '%s' must be specified as a non-blank string.",
property.getDisplayName()));
- }
- return fileSystem;
- }
-
- public static String evaluateDirectoryProperty(ProcessContext context,
FlowFile flowFile) {
- return evaluateDirectoryProperty(context, flowFile, DIRECTORY);
- }
-
- public static String evaluateDirectoryProperty(ProcessContext context,
FlowFile flowFile, PropertyDescriptor property) {
- String directory =
context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
- if (directory.startsWith("/")) {
- throw new ProcessException(String.format("'%1$s' starts with '/'.
'%s' cannot contain a leading '/'.", property.getDisplayName()));
- } else if (StringUtils.isNotEmpty(directory) &&
StringUtils.isWhitespace(directory)) {
- throw new ProcessException(String.format("'%1$s' contains
whitespace characters only.", property.getDisplayName()));
- }
- return directory;
- }
-
- public static String evaluateFileNameProperty(ProcessContext context,
FlowFile flowFile) {
- String fileName =
context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isBlank(fileName)) {
- throw new ProcessException(String.format("'%1$s' property
evaluated to blank string. '%s' must be specified as a non-blank string.",
FILE.getDisplayName()));
- }
- return fileName;
- }
-
- public static class DirectoryValidator implements Validator {
- private String displayName;
-
- public DirectoryValidator() {
- this.displayName = null;
- }
-
- public DirectoryValidator(String displayName) {
- this.displayName = displayName;
- }
-
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- displayName = displayName == null ? DIRECTORY.getDisplayName() :
displayName;
- ValidationResult.Builder builder = new ValidationResult.Builder()
- .subject(displayName)
- .input(input);
-
- if (context.isExpressionLanguagePresent(input)) {
- builder.valid(true).explanation("Expression Language Present");
- } else if (input.startsWith("/")) {
- builder.valid(false).explanation(String.format("'%s' cannot
contain a leading '/'", displayName));
- } else if (StringUtils.isNotEmpty(input) &&
StringUtils.isWhitespace(input)) {
- builder.valid(false).explanation(String.format("'%s' cannot
contain whitespace characters only", displayName));
- } else {
- builder.valid(true);
- }
-
- return builder.build();
- }
- }
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java
index a1c7b07010..91ad7005b9 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java
@@ -79,6 +79,7 @@ import static
com.azure.storage.blob.specialized.BlockBlobClient.MAX_STAGE_BLOCK
import static
com.azure.storage.blob.specialized.BlockBlobClient.MAX_UPLOAD_BLOB_BYTES_LONG;
import static com.azure.storage.common.implementation.Constants.STORAGE_SCOPE;
import static java.net.HttpURLConnection.HTTP_ACCEPTED;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@@ -148,7 +149,7 @@ public class CopyAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 {
.build();
public static final PropertyDescriptor
DESTINATION_STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
- .fromPropertyDescriptor(STORAGE_CREDENTIALS_SERVICE)
+ .fromPropertyDescriptor(BLOB_STORAGE_CREDENTIALS_SERVICE)
.displayName("Destination Storage Credentials")
.build();
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
index f241ffd05a..a8e889842b 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
@@ -38,6 +38,7 @@ import
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
@@ -73,7 +74,7 @@ public class DeleteAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 {
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
- STORAGE_CREDENTIALS_SERVICE,
+ BLOB_STORAGE_CREDENTIALS_SERVICE,
CONTAINER,
BLOB_NAME,
DELETE_SNAPSHOTS_OPTION,
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 11d09514d6..080012600e 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -29,19 +29,22 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.time.Duration;
import java.util.List;
-import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class,
ListAzureDataLakeStorage.class})
@@ -62,12 +65,7 @@ public class DeleteAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProc
.build();
public static final PropertyDescriptor FILE = new
PropertyDescriptor.Builder()
- .name("file-name").displayName("File Name")
- .description("The filename")
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
+ .fromPropertyDescriptor(AzureStorageUtils.FILE)
.dependsOn(FILESYSTEM_OBJECT_TYPE, FS_TYPE_FILE)
.build();
@@ -90,14 +88,14 @@ public class DeleteAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProc
final boolean isFile =
context.getProperty(FILESYSTEM_OBJECT_TYPE).getValue().equals(FS_TYPE_FILE.getValue());
final DataLakeServiceClient storageClient =
getStorageClient(context, flowFile);
- final String fileSystem = evaluateFileSystemProperty(context,
flowFile);
+ final String fileSystem = evaluateFileSystemProperty(FILESYSTEM,
context, flowFile);
final DataLakeFileSystemClient fileSystemClient =
storageClient.getFileSystemClient(fileSystem);
- final String directory = evaluateDirectoryProperty(context,
flowFile);
+ final String directory = evaluateDirectoryProperty(DIRECTORY,
context, flowFile);
final DataLakeDirectoryClient directoryClient =
fileSystemClient.getDirectoryClient(directory);
if (isFile) {
- final String fileName = evaluateFileNameProperty(context,
flowFile);
+ final String fileName = evaluateFileProperty(context,
flowFile);
final DataLakeFileClient fileClient =
directoryClient.getFileClient(fileName);
fileClient.delete();
session.transfer(flowFile, REL_SUCCESS);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
index 45100a65c3..fb0757403e 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
@@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@@ -140,7 +141,7 @@ public class FetchAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 im
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
- STORAGE_CREDENTIALS_SERVICE,
+ BLOB_STORAGE_CREDENTIALS_SERVICE,
CONTAINER,
BLOB_NAME,
RANGE_START,
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 2018a90dc0..bd53b35561 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -47,6 +47,14 @@ import
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
+
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class,
ListAzureDataLakeStorage.class})
@CapabilityDescription("Fetch the specified file from Azure Data Lake Storage")
@@ -149,9 +157,9 @@ public class FetchAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProce
final DownloadRetryOptions retryOptions = new
DownloadRetryOptions();
retryOptions.setMaxRetryRequests(numRetries);
- final String fileSystem = evaluateFileSystemProperty(context,
flowFile);
- final String directory = evaluateDirectoryProperty(context,
flowFile);
- final String fileName = evaluateFileNameProperty(context,
flowFile);
+ final String fileSystem = evaluateFileSystemProperty(FILESYSTEM,
context, flowFile);
+ final String directory = evaluateDirectoryProperty(DIRECTORY,
context, flowFile);
+ final String fileName = evaluateFileProperty(context, flowFile);
final DataLakeServiceClient storageClient =
getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient =
storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient =
fileSystemClient.getDirectoryClient(directory);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
index 20aa86c0ac..6074dcf549 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
@@ -59,7 +59,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
@@ -135,7 +135,7 @@ public class ListAzureBlobStorage_v12 extends
AbstractListAzureProcessor<BlobInf
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
- STORAGE_CREDENTIALS_SERVICE,
+ BLOB_STORAGE_CREDENTIALS_SERVICE,
CONTAINER,
BLOB_NAME_PREFIX,
RECORD_WRITER,
@@ -202,7 +202,7 @@ public class ListAzureBlobStorage_v12 extends
AbstractListAzureProcessor<BlobInf
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor
property) {
- return STORAGE_CREDENTIALS_SERVICE.equals(property)
+ return BLOB_STORAGE_CREDENTIALS_SERVICE.equals(property)
|| CONTAINER.equals(property)
|| BLOB_NAME_PREFIX.equals(property)
|| LISTING_STRATEGY.equals(property);
@@ -217,7 +217,7 @@ public class ListAzureBlobStorage_v12 extends
AbstractListAzureProcessor<BlobInf
try {
final List<BlobInfo> listing = new ArrayList<>();
- final AzureStorageCredentialsService_v12 credentialsService =
context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
+ final AzureStorageCredentialsService_v12 credentialsService =
context.getProperty(BLOB_STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
final AzureStorageCredentialsDetails_v12 credentialsDetails =
credentialsService.getCredentialsDetails(Collections.emptyMap());
final BlobServiceClient storageClient =
clientFactory.getStorageClient(credentialsDetails);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index 425b86b4e9..f1a50ca4e9 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -62,12 +62,7 @@ import java.util.regex.Pattern;
import static
org.apache.nifi.processor.util.list.ListedEntityTracker.INITIAL_LISTING_TARGET;
import static
org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
import static
org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_TIME_WINDOW;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
@@ -82,6 +77,11 @@ import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
@PrimaryNodeOnly
@TriggerSerially
@@ -265,8 +265,8 @@ public class ListAzureDataLakeStorage extends
AbstractListAzureProcessor<ADLSFil
private List<ADLSFileInfo> performListing(final ProcessContext context,
final Long minTimestamp, final ListingMode listingMode,
final boolean applyFilters)
throws IOException {
try {
- final String fileSystem = evaluateFileSystemProperty(context,
null);
- final String baseDirectory = evaluateDirectoryProperty(context,
null);
+ final String fileSystem = evaluateFileSystemProperty(FILESYSTEM,
context);
+ final String baseDirectory = evaluateDirectoryProperty(DIRECTORY,
context);
final boolean recurseSubdirectories =
context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
final Pattern filePattern = listingMode == ListingMode.EXECUTION ?
this.filePattern : getPattern(context, FILE_FILTER);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
index 4d4a11f9bc..d388ab3289 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
@@ -37,6 +37,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +58,13 @@ import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class,
ListAzureDataLakeStorage.class})
@@ -147,11 +155,11 @@ public class MoveAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProces
final long startNanos = System.nanoTime();
try {
- final String sourceFileSystem =
evaluateFileSystemProperty(context, flowFile, SOURCE_FILESYSTEM);
- final String sourceDirectory = evaluateDirectoryProperty(context,
flowFile, SOURCE_DIRECTORY);
- final String destinationFileSystem =
evaluateFileSystemProperty(context, flowFile, DESTINATION_FILESYSTEM);
- final String destinationDirectory =
evaluateDirectoryProperty(context, flowFile, DESTINATION_DIRECTORY);
- final String fileName = evaluateFileNameProperty(context,
flowFile);
+ final String sourceFileSystem =
evaluateFileSystemProperty(SOURCE_FILESYSTEM, context, flowFile);
+ final String sourceDirectory =
evaluateDirectoryProperty(SOURCE_DIRECTORY, context, flowFile);
+ final String destinationFileSystem =
evaluateFileSystemProperty(DESTINATION_FILESYSTEM, context, flowFile);
+ final String destinationDirectory =
evaluateDirectoryProperty(DESTINATION_DIRECTORY, context, flowFile);
+ final String fileName = evaluateFileProperty(context, flowFile);
final String destinationPath;
if (!destinationDirectory.isEmpty() && !sourceDirectory.isEmpty())
{
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index 3d2a36e67d..8f13f7e302 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit;
import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM;
import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@@ -104,7 +105,7 @@ import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR
public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12
implements ClientSideEncryptionSupport {
private static final List<PropertyDescriptor> PROPERTIES = List.of(
- STORAGE_CREDENTIALS_SERVICE,
+ BLOB_STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.CONTAINER,
AzureStorageUtils.CREATE_CONTAINER,
AzureStorageUtils.CONFLICT_RESOLUTION,
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 817435ed2f..7651ad6a99 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -39,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.util.StringUtils;
@@ -61,6 +62,13 @@ import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
@@ -128,11 +136,11 @@ public class PutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcess
final long startNanos = System.nanoTime();
try {
- final String fileSystem = evaluateFileSystemProperty(context,
flowFile);
- final String originalDirectory =
evaluateDirectoryProperty(context, flowFile);
- final String tempPath = evaluateDirectoryProperty(context,
flowFile, BASE_TEMPORARY_PATH);
+ final String fileSystem = evaluateFileSystemProperty(FILESYSTEM,
context, flowFile);
+ final String originalDirectory =
evaluateDirectoryProperty(DIRECTORY, context, flowFile);
+ final String tempPath =
evaluateDirectoryProperty(BASE_TEMPORARY_PATH, context, flowFile);
final String tempDirectory = createPath(tempPath,
TEMP_FILE_DIRECTORY);
- final String fileName = evaluateFileNameProperty(context,
flowFile);
+ final String fileName = evaluateFileProperty(context, flowFile);
final DataLakeFileSystemClient fileSystemClient =
getFileSystemClient(context, flowFile, fileSystem);
final DataLakeDirectoryClient directoryClient =
fileSystemClient.getDirectoryClient(originalDirectory);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index ed79f4436d..7a0165d7de 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -17,16 +17,22 @@
package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.http.ProxyOptions;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.proxy.SocksVersion;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import
org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
+import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType;
import reactor.netty.http.client.HttpClient;
@@ -34,6 +40,9 @@ import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
public final class AzureStorageUtils {
public static final String STORAGE_ACCOUNT_NAME_PROPERTY_DESCRIPTOR_NAME =
"storage-account-name";
@@ -41,6 +50,22 @@ public final class AzureStorageUtils {
public static final String STORAGE_SAS_TOKEN_PROPERTY_DESCRIPTOR_NAME =
"storage-sas-token";
public static final String
STORAGE_ENDPOINT_SUFFIX_PROPERTY_DESCRIPTOR_NAME = "storage-endpoint-suffix";
+ public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new
PropertyDescriptor.Builder()
+ .name("adls-credentials-service")
+ .displayName("ADLS Credentials")
+ .description("Controller Service used to obtain Azure
Credentials.")
+ .identifiesControllerService(ADLSCredentialsService.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor BLOB_STORAGE_CREDENTIALS_SERVICE =
new PropertyDescriptor.Builder()
+ .name("storage-credentials-service")
+ .displayName("Storage Credentials")
+ .description("Controller Service used to obtain Azure Blob Storage
Credentials.")
+
.identifiesControllerService(AzureStorageCredentialsService_v12.class)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor CREDENTIALS_TYPE = new
PropertyDescriptor.Builder()
.name("credentials-type")
.displayName("Credentials Type")
@@ -54,6 +79,33 @@ public final class AzureStorageUtils {
.defaultValue(AzureStorageCredentialsType.SAS_TOKEN)
.build();
+ public static final PropertyDescriptor FILESYSTEM = new
PropertyDescriptor.Builder()
+ .name("filesystem-name").displayName("Filesystem Name")
+ .description("Name of the Azure Storage File System (also called
Container). It is assumed to be already existing.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor DIRECTORY = new
PropertyDescriptor.Builder()
+ .name("directory-name")
+ .displayName("Directory Name")
+ .description("Name of the Azure Storage Directory. The Directory
Name cannot contain a leading '/'. The root directory can be designated by the
empty string value. " +
+ "In case of the PutAzureDataLakeStorage processor, the
directory will be created if not already existing.")
+ .addValidator(new DirectoryValidator())
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FILE = new
PropertyDescriptor.Builder()
+ .name("file-name").displayName("File Name")
+ .description("The filename")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
+ .build();
+
public static final String ACCOUNT_KEY_BASE_DESCRIPTION =
"The storage account key. This is an admin-like password providing
access to every container in this account. It is recommended " +
"one uses Shared Access Signature (SAS) token, Managed Identity or
Service Principal instead for fine-grained control with policies.";
@@ -215,6 +267,57 @@ public final class AzureStorageUtils {
ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
}
+ public static String evaluateFileSystemProperty(PropertyDescriptor
property, PropertyContext context) {
+ return evaluateFileSystemProperty(property, context, (Map<String,
String>) null);
+ }
+
+ public static String evaluateFileSystemProperty(PropertyDescriptor
property, PropertyContext context, FlowFile flowFile) {
+ return evaluateFileSystemProperty(property, context,
flowFile.getAttributes());
+ }
+
+ public static String evaluateFileSystemProperty(PropertyDescriptor
property, PropertyContext context, Map<String, String> attributes) {
+ final String fileSystem = evaluateProperty(property, context,
attributes);
+ if (StringUtils.isBlank(fileSystem)) {
+ throw new ProcessException(String.format("'%1$s' property
evaluated to blank string. '%s' must be specified as a non-blank string.",
+ property.getDisplayName()));
+ }
+ return fileSystem;
+ }
+
+ public static String evaluateDirectoryProperty(PropertyDescriptor
property, PropertyContext context) {
+ return evaluateDirectoryProperty(property, context, (Map<String,
String>) null);
+ }
+
+ public static String evaluateDirectoryProperty(PropertyDescriptor
property, PropertyContext context, FlowFile flowFile) {
+ return evaluateDirectoryProperty(property, context,
flowFile.getAttributes());
+ }
+
+ public static String evaluateDirectoryProperty(PropertyDescriptor
property, PropertyContext context, Map<String, String> attributes) {
+ final String directory = evaluateProperty(property, context,
attributes);
+ if (directory.startsWith("/")) {
+ throw new ProcessException(String.format("'%1$s' starts with '/'.
'%s' cannot contain a leading '/'.", property.getDisplayName()));
+ } else if (StringUtils.isNotEmpty(directory) &&
StringUtils.isWhitespace(directory)) {
+ throw new ProcessException(String.format("'%1$s' contains
whitespace characters only.", property.getDisplayName()));
+ }
+ return directory;
+ }
+
+ public static String evaluateFileProperty(PropertyContext context,
FlowFile flowFile) {
+ return evaluateFileProperty(context, flowFile.getAttributes());
+ }
+
+ public static String evaluateFileProperty(PropertyContext context,
Map<String, String> attributes) {
+ final String fileName = evaluateProperty(FILE, context, attributes);
+ if (StringUtils.isBlank(fileName)) {
+ throw new ProcessException(String.format("'%1$s' property
evaluated to blank string. '%s' must be specified as a non-blank string.",
FILE.getDisplayName()));
+ }
+ return fileName;
+ }
+
+ private static String evaluateProperty(PropertyDescriptor
propertyDescriptor, PropertyContext context, Map<String, String> attributes) {
+ return
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(attributes).getValue();
+ }
+
/**
*
* Creates the {@link ProxyOptions proxy options} that {@link HttpClient}
will use.
@@ -252,4 +355,36 @@ public final class AzureStorageUtils {
throw new IllegalArgumentException("Unsupported proxy type: " +
proxyConfiguration.getProxyType());
}
}
+
+ public static class DirectoryValidator implements Validator {
+ private String displayName;
+
+ public DirectoryValidator() {
+ this.displayName = null;
+ }
+
+ public DirectoryValidator(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ displayName = displayName == null ? DIRECTORY.getDisplayName() :
displayName;
+ ValidationResult.Builder builder = new ValidationResult.Builder()
+ .subject(displayName)
+ .input(input);
+
+ if (context.isExpressionLanguagePresent(input)) {
+ builder.valid(true).explanation("Expression Language Present");
+ } else if (input.startsWith("/")) {
+ builder.valid(false).explanation(String.format("'%s' cannot
contain a leading '/'", displayName));
+ } else if (StringUtils.isNotEmpty(input) &&
StringUtils.isWhitespace(input)) {
+ builder.valid(false).explanation(String.format("'%s' cannot
contain whitespace characters only", displayName));
+ } else {
+ builder.valid(true);
+ }
+
+ return builder.build();
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceService.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceService.java
new file mode 100644
index 0000000000..ef2b0fb1a4
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceService.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobStorageException;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
+import org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
+import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
+import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
+import static org.apache.nifi.util.StringUtils.isBlank;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "file", "resource", "blob"})
+@SeeAlso({FetchAzureBlobStorage_v12.class})
+@CapabilityDescription("Provides an Azure Blob Storage file resource for other
components.")
+@UseCase(
+ description = "Fetch a specific file from Azure Blob Storage." +
+ " The service provides higher performance compared to fetch
processors when the data should be moved between different storages without any
transformation.",
+ configuration = """
+ "Container Name" = "${azure.container}"
+ "Blob Name" = "${azure.blobname}"
+
+ The "Storage Credentials" property should specify an instance
of the AzureStorageCredentialsService_v12 in order to provide credentials for
accessing the storage container.
+ """
+)
+public class AzureBlobStorageFileResourceService extends
AbstractControllerService implements FileResourceService {
+
+ public static final PropertyDescriptor CONTAINER = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AzureStorageUtils.CONTAINER)
+ .defaultValue(String.format("${%s}", ATTR_NAME_CONTAINER))
+ .build();
+
+ public static final PropertyDescriptor BLOB_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AbstractAzureBlobProcessor_v12.BLOB_NAME)
+ .defaultValue(String.format("${%s}", ATTR_NAME_BLOBNAME))
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = List.of(
+ BLOB_STORAGE_CREDENTIALS_SERVICE,
+ CONTAINER,
+ BLOB_NAME
+ );
+
+ private volatile BlobServiceClientFactory clientFactory;
+ private volatile ConfigurationContext context;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.clientFactory = new BlobServiceClientFactory(getLogger(),
getProxyOptions(context));
+ this.context = context;
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ this.clientFactory = null;
+ this.context = null;
+ }
+
+ @Override
+ public FileResource getFileResource(Map<String, String> attributes) {
+ final BlobServiceClient client = getStorageClient(attributes);
+ try {
+ return fetchBlob(client, attributes);
+ } catch (final BlobStorageException | IOException e) {
+ throw new ProcessException("Failed to fetch blob from Azure Blob
Storage", e);
+ }
+ }
+
+ protected BlobServiceClient getStorageClient(Map<String, String>
attributes) {
+ final AzureStorageCredentialsService_v12 credentialsService =
context.getProperty(BLOB_STORAGE_CREDENTIALS_SERVICE)
+ .asControllerService(AzureStorageCredentialsService_v12.class);
+ return
clientFactory.getStorageClient(credentialsService.getCredentialsDetails(attributes));
+ }
+
+ /**
+ * Fetching blob from the provided container.
+ *
+ * @param storageClient azure blob storage client
+ * @param attributes configuration attributes
+ * @return fetched blob as FileResource
+ * @throws IOException exception caused by missing parameters or blob not
found
+ */
+ private FileResource fetchBlob(final BlobServiceClient storageClient,
final Map<String, String> attributes) throws IOException {
+ final String containerName =
context.getProperty(CONTAINER).evaluateAttributeExpressions(attributes).getValue();
+ final String blobName =
context.getProperty(BLOB_NAME).evaluateAttributeExpressions(attributes).getValue();
+
+ if (isBlank(containerName) || isBlank(blobName)) {
+ throw new ProcessException("Container name and blob name cannot be
empty");
+ }
+
+ final BlobContainerClient containerClient =
storageClient.getBlobContainerClient(containerName);
+ final BlobClient blobClient = containerClient.getBlobClient(blobName);
+ if (!blobClient.exists()) {
+ throw new ProcessException(String.format("Blob %s/%s not found",
containerName, blobName));
+ }
+ return new FileResource(blobClient.openInputStream(),
blobClient.getProperties().getBlobSize());
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceService.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceService.java
new file mode 100644
index 0000000000..f9c9ae8188
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceService.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import
org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "file",
"resource", "datalake"})
+@SeeAlso({FetchAzureDataLakeStorage.class})
+@CapabilityDescription("Provides an Azure Data Lake Storage (ADLS) file
resource for other components.")
+@UseCase(
+ description = "Fetch the specified file from Azure Data Lake Storage."
+
+ " The service provides higher performance compared to fetch
processors when the data should be moved between different storages without any
transformation.",
+ configuration = """
+ "Filesystem Name" = "${azure.filesystem}"
+ "Directory Name" = "${azure.directory}"
+ "File Name" = "${azure.filename}"
+
+ The "ADLS Credentials" property should specify an instance of
the ADLSCredentialsService in order to provide credentials for accessing the
filesystem.
+ """
+)
+public class AzureDataLakeStorageFileResourceService extends
AbstractControllerService implements FileResourceService {
+
+ public static final PropertyDescriptor FILESYSTEM = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AzureStorageUtils.FILESYSTEM)
+ .defaultValue(String.format("${%s}", ATTR_NAME_FILESYSTEM))
+ .build();
+
+ public static final PropertyDescriptor DIRECTORY = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AzureStorageUtils.DIRECTORY)
+ .defaultValue(String.format("${%s}", ATTR_NAME_DIRECTORY))
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = List.of(
+ ADLS_CREDENTIALS_SERVICE,
+ FILESYSTEM,
+ DIRECTORY,
+ FILE
+ );
+
+ private volatile DataLakeServiceClientFactory clientFactory;
+ private volatile ConfigurationContext context;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.clientFactory = new DataLakeServiceClientFactory(getLogger(),
getProxyOptions(context));
+ this.context = context;
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ this.clientFactory = null;
+ this.context = null;
+ }
+
+ @Override
+ public FileResource getFileResource(Map<String, String> attributes) {
+ final DataLakeServiceClient client = getStorageClient(attributes);
+ try {
+ return fetchFile(client, attributes);
+ } catch (final DataLakeStorageException | IOException e) {
+ throw new ProcessException("Failed to fetch file from ADLS
Storage", e);
+ }
+ }
+
+ protected DataLakeServiceClient getStorageClient(Map<String, String>
attributes) {
+ final ADLSCredentialsService credentialsService =
context.getProperty(ADLS_CREDENTIALS_SERVICE)
+ .asControllerService(ADLSCredentialsService.class);
+ return
clientFactory.getStorageClient(credentialsService.getCredentialsDetails(attributes));
+ }
+
+ /**
+ * Fetching file from the provided filesystem and directory in ADLS.
+ *
+ * @param storageClient azure data lake service client
+ * @param attributes configuration attributes
+ * @return fetched file as FileResource
+ * @throws IOException exception caused by missing parameters or blob not
found
+ */
+ private FileResource fetchFile(final DataLakeServiceClient storageClient,
final Map<String, String> attributes) throws IOException {
+ final String fileSystem = evaluateFileSystemProperty(FILESYSTEM,
context, attributes);
+ final String directory = evaluateDirectoryProperty(DIRECTORY, context,
attributes);
+ final String file = evaluateFileProperty(context, attributes);
+
+ final DataLakeFileSystemClient fileSystemClient =
storageClient.getFileSystemClient(fileSystem);
+ final DataLakeDirectoryClient directoryClient =
fileSystemClient.getDirectoryClient(directory);
+ final DataLakeFileClient fileClient =
directoryClient.getFileClient(file);
+
+ if (fileClient.getProperties().isDirectory()) {
+ throw new ProcessException(FILE.getDisplayName() + " (" + file +
") points to a directory. Full path: " + fileClient.getFilePath());
+ }
+
+ if (!fileClient.exists()) {
+ throw new ProcessException(String.format("File %s/%s not found in
file system: %s", directory, file, fileSystem));
+ }
+
+ return new FileResource(fileClient.openInputStream().getInputStream(),
+ fileClient.getProperties().getFileSize());
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 92f9ddbe5f..2823c4f3fc 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -21,3 +21,5 @@
org.apache.nifi.services.azure.data.explorer.StandardKustoIngestService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12
org.apache.nifi.services.azure.StandardAzureCredentialsControllerService
+org.apache.nifi.services.azure.storage.AzureBlobStorageFileResourceService
+org.apache.nifi.services.azure.storage.AzureDataLakeStorageFileResourceService
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
index 058ee2f0a7..1899dad4bc 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
@@ -51,6 +51,7 @@ import java.util.Map;
import java.util.UUID;
import static
org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
public abstract class AbstractAzureBlobStorage_v12IT extends
AbstractAzureStorageIT {
protected static final String SERVICE_ID = "credentials-service";
@@ -90,7 +91,7 @@ public abstract class AbstractAzureBlobStorage_v12IT extends
AbstractAzureStorag
runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY,
getAccountKey());
runner.enableControllerService(service);
-
runner.setProperty(AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE,
SERVICE_ID);
+ runner.setProperty(BLOB_STORAGE_CREDENTIALS_SERVICE, SERVICE_ID);
}
@BeforeEach
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
index e683e6f697..8e94485a1f 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
@@ -23,7 +23,6 @@ import
com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
@@ -58,14 +57,14 @@ public abstract class AbstractAzureDataLakeStorageIT
extends AbstractAzureStorag
runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY,
getAccountKey());
runner.enableControllerService(service);
-
runner.setProperty(AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE,
"ADLSCredentials");
+ runner.setProperty(AzureStorageUtils.ADLS_CREDENTIALS_SERVICE,
"ADLSCredentials");
}
@BeforeEach
public void setUpAzureDataLakeStorageIT() {
fileSystemName = String.format("%s-%s", FILESYSTEM_NAME_PREFIX,
UUID.randomUUID());
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
fileSystemName);
+ runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystemName);
DataLakeServiceClient storageClient = createStorageClient();
fileSystemClient = storageClient.createFileSystem(fileSystemName);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
index 0443e78ac5..fc30522ef6 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
@@ -21,6 +21,7 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@@ -460,8 +461,8 @@ public class ITDeleteAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT
private void setRunnerProperties(String fileSystem, String directory,
String filename) {
runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM_OBJECT_TYPE,
filename != null ? FS_TYPE_FILE : FS_TYPE_DIRECTORY);
- runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem);
- runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory);
+ runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystem);
+ runner.setProperty(AzureStorageUtils.DIRECTORY, directory);
if (filename != null) {
runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
index 54403b6183..767ad58d15 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@@ -518,9 +519,9 @@ public class ITFetchAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT
}
private void setRunnerProperties(String fileSystem, String directory,
String filename, String rangeStart, String rangeLength) {
- runner.setProperty(FetchAzureDataLakeStorage.FILESYSTEM, fileSystem);
- runner.setProperty(FetchAzureDataLakeStorage.DIRECTORY, directory);
- runner.setProperty(FetchAzureDataLakeStorage.FILE, filename);
+ runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystem);
+ runner.setProperty(AzureStorageUtils.DIRECTORY, directory);
+ runner.setProperty(AzureStorageUtils.FILE, filename);
if (rangeStart != null) {
runner.setProperty(FetchAzureDataLakeStorage.RANGE_START,
rangeStart);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
index f4259e5605..abfd5f5feb 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
@@ -99,7 +100,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootRecursive() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runProcessor();
@@ -108,7 +109,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootRecursiveWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
runProcessor();
@@ -122,7 +123,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootRecursiveUsingProxyConfigurationService() throws
Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
configureProxyService();
runProcessor();
@@ -132,7 +133,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootNonRecursive() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES,
"false");
runProcessor();
@@ -142,7 +143,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootNonRecursiveWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES,
"false");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -153,7 +154,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryRecursive() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runProcessor();
@@ -162,7 +163,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryRecursiveWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
runProcessor();
@@ -174,7 +175,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryNonRecursive() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES,
"false");
runProcessor();
@@ -184,7 +185,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryNonRecursiveWithTempFiles() throws
Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES,
"false");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -195,7 +196,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithFileFilter() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runProcessor();
@@ -205,7 +206,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithFileFilterWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -219,7 +220,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithFileFilterWithEL() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER,
".*file${suffix}$");
runner.setEnvironmentVariableValue("suffix", "1.*");
@@ -230,7 +231,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithFileFilterWithELWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER,
".*file${suffix}$");
runner.setEnvironmentVariableValue("suffix", "1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -245,7 +246,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithPathFilter() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
runProcessor();
@@ -255,7 +256,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithPathFilterWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -268,7 +269,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithPathFilterWithEL() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER,
"${prefix}${suffix}");
runner.setEnvironmentVariableValue("prefix", "^dir");
runner.setEnvironmentVariableValue("suffix", "1.*$");
@@ -280,7 +281,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithPathFilterWithELWithTempFiles() throws
Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER,
"${prefix}${suffix}");
runner.setEnvironmentVariableValue("prefix", "^dir");
runner.setEnvironmentVariableValue("suffix", "1.*$");
@@ -295,7 +296,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryWithPathFilter() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runProcessor();
@@ -305,7 +306,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryWithPathFilterWithTempFiles() throws
Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -316,7 +317,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithFileAndPathFilter() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
@@ -327,7 +328,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithFileAndPathFilterWithTempFiles() throws
Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -340,7 +341,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListEmptyDirectory() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir3");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir3");
runProcessor();
@@ -349,7 +350,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListNonExistingDirectory() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dummy");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dummy");
runProcessor();
@@ -358,8 +359,8 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithNonExistingFileSystem() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
"dummy");
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runProcessor();
@@ -368,7 +369,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithRecords() throws InitializationException {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
@@ -384,7 +385,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithRecordsWithTempFiles() throws
InitializationException {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
@@ -402,7 +403,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMinAge() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
runProcessor();
@@ -412,7 +413,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMinAgeWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -423,7 +424,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMaxAge() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
runProcessor();
@@ -433,7 +434,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMaxAgeWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -448,7 +449,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMinSize() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
runProcessor();
@@ -458,7 +459,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMinSizeWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
@@ -472,7 +473,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMaxSize() {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
runProcessor();
@@ -482,7 +483,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMaxSizeWithTempFiles() throws Exception {
- runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
index 37faa6fd9b..751ff36f4c 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
@@ -20,6 +20,7 @@ import
com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
@@ -66,7 +67,7 @@ public class ITMoveAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY,
SOURCE_DIRECTORY);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM,
fileSystemName);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY,
DESTINATION_DIRECTORY);
- runner.setProperty(MoveAzureDataLakeStorage.FILE, FILE_NAME);
+ runner.setProperty(AzureStorageUtils.FILE, FILE_NAME);
}
@Test
@@ -187,7 +188,7 @@ public class ITMoveAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
public void testMoveFileWithInvalidFileName() {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
- runner.setProperty(MoveAzureDataLakeStorage.FILE, "/file1");
+ runner.setProperty(AzureStorageUtils.FILE, "/file1");
runProcessor(FILE_DATA);
@@ -203,7 +204,7 @@ public class ITMoveAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY,
sourceDirectory);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY,
destinationDirectory);
- runner.setProperty(MoveAzureDataLakeStorage.FILE, fileName);
+ runner.setProperty(AzureStorageUtils.FILE, fileName);
runProcessor(FILE_DATA);
@@ -296,7 +297,7 @@ public class ITMoveAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
private void setELProperties() {
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM,
String.format("${%s}", EL_FILESYSTEM));
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY,
String.format("${%s}", EL_DIRECTORY));
- runner.setProperty(MoveAzureDataLakeStorage.FILE,
String.format("${%s}", EL_FILE_NAME));
+ runner.setProperty(AzureStorageUtils.FILE, String.format("${%s}",
EL_FILE_NAME));
}
private void runProcessor(byte[] fileData) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 9d22d2f88c..4a59a9b3a4 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.fileresource.service.StandardFileResourceService;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.transfer.ResourceTransferProperties;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -64,8 +65,8 @@ public class ITPutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@BeforeEach
public void setUp() {
- runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, DIRECTORY);
- runner.setProperty(PutAzureDataLakeStorage.FILE, FILE_NAME);
+ runner.setProperty(AzureStorageUtils.DIRECTORY, DIRECTORY);
+ runner.setProperty(AzureStorageUtils.FILE, FILE_NAME);
}
@Test
@@ -121,7 +122,7 @@ public class ITPutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
String baseDirectory = "dir1/dir2";
String fullDirectory = baseDirectory + "/dir3/dir4";
fileSystemClient.createDirectory(baseDirectory);
- runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, fullDirectory);
+ runner.setProperty(AzureStorageUtils.DIRECTORY, fullDirectory);
runProcessor(FILE_DATA);
@@ -131,7 +132,7 @@ public class ITPutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testPutFileToRootDirectory() throws Exception {
String rootDirectory = "";
- runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, rootDirectory);
+ runner.setProperty(AzureStorageUtils.DIRECTORY, rootDirectory);
runProcessor(FILE_DATA);
@@ -160,7 +161,7 @@ public class ITPutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testPutFileWithNonExistingFileSystem() {
- runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, "dummy");
+ runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy");
runProcessor(FILE_DATA);
@@ -169,7 +170,7 @@ public class ITPutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testPutFileWithInvalidFileName() {
- runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1");
+ runner.setProperty(AzureStorageUtils.FILE, "/file1");
runProcessor(FILE_DATA);
@@ -180,8 +181,8 @@ public class ITPutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
public void testPutFileWithSpacesInDirectoryAndFileName() throws Exception
{
String directory = "dir 1";
String fileName = "file 1";
- runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, directory);
- runner.setProperty(PutAzureDataLakeStorage.FILE, fileName);
+ runner.setProperty(AzureStorageUtils.DIRECTORY, directory);
+ runner.setProperty(AzureStorageUtils.FILE, fileName);
runProcessor(FILE_DATA);
@@ -290,9 +291,9 @@ public class ITPutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
}
private void setELProperties() {
- runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM,
String.format("${%s}", EL_FILESYSTEM));
- runner.setProperty(PutAzureDataLakeStorage.DIRECTORY,
String.format("${%s}", EL_DIRECTORY));
- runner.setProperty(PutAzureDataLakeStorage.FILE,
String.format("${%s}", EL_FILE_NAME));
+ runner.setProperty(AzureStorageUtils.FILESYSTEM,
String.format("${%s}", EL_FILESYSTEM));
+ runner.setProperty(AzureStorageUtils.DIRECTORY, String.format("${%s}",
EL_DIRECTORY));
+ runner.setProperty(AzureStorageUtils.FILE, String.format("${%s}",
EL_FILE_NAME));
}
private void runProcessor(byte[] fileData) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
index 4bf683fb20..68b1cf180f 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
@@ -22,10 +22,10 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE;
-import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceServiceTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceServiceTest.java
new file mode 100644
index 0000000000..30193c3aa0
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceServiceTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.services.azure.storage;
+
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.specialized.BlobInputStream;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.azure.AzureCredentialsService;
+import
org.apache.nifi.services.azure.StandardAzureCredentialsControllerService;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Map;
+
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class AzureBlobStorageFileResourceServiceTest {
+ private static final String CONTROLLER_SERVICE = "AzureCredentialsService";
+ private static final String CONTAINER = "container-name";
+ private static final String BLOB_NAME = "test-file";
+ private static final long CONTENT_LENGTH = 10L;
+
+ @Mock
+ private BlobServiceClient client;
+
+ @Mock
+ private BlobContainerClient containerClient;
+
+ @Mock
+ private BlobClient blobClient;
+
+ @Mock
+ private BlobProperties blobProperties;
+
+ @Mock
+ private BlobInputStream blobInputStream;
+
+ @InjectMocks
+ private TestAzureBlobStorageFileResourceService service;
+
+ private TestRunner runner;
+
+ @BeforeEach
+ void setup() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService("AzureBlobStorageFileResourceService",
service);
+ }
+
+ @Test
+ void testValidBlob() throws InitializationException {
+ setupService();
+ setupMocking(CONTAINER, BLOB_NAME);
+
+ final FileResource fileResource = service.getFileResource(Map.of());
+
+ assertFileResource(fileResource);
+ verifyMockInvocations(CONTAINER, BLOB_NAME);
+ }
+
+ @Test
+ void testValidBlobWithEL() throws InitializationException {
+ String customContainer = "custom-container";
+ String customBlobName = "custom-blob-name";
+ String blobKey = "blob.name";
+ String containerKey = "container.name";
+ setupService(String.format("${%s}", blobKey), String.format("${%s}",
containerKey));
+ setupMocking(customContainer, customBlobName);
+ runner.setValidateExpressionUsage(false);
+
+ final FileResource fileResource = service.getFileResource(Map.of(
+ blobKey, customBlobName,
+ containerKey, customContainer));
+
+ assertFileResource(fileResource);
+ verifyMockInvocations(customContainer, customBlobName);
+ }
+
+ @Test
+ void testNonExistingBlob() throws InitializationException {
+ setupService();
+
when(client.getBlobContainerClient(CONTAINER)).thenReturn(containerClient);
+ when(containerClient.getBlobClient(BLOB_NAME)).thenReturn(blobClient);
+ when(blobClient.exists()).thenReturn(false);
+
+ assertThrows(ProcessException.class,
+ () -> service.getFileResource(Map.of()),
+ "Failed to fetch blob from Azure Blob Storage");
+ }
+
+ @Test
+ void testELWithMissingAttribute() throws InitializationException {
+ runner.setValidateExpressionUsage(false);
+
+ setupService(String.format("${%s}", BLOB_NAME), String.format("${%s}",
CONTAINER));
+
+ assertThrows(ProcessException.class,
+ () -> service.getFileResource(Map.of()),
+ "Container name and blob name cannot be empty");
+ }
+
+ private void setupService() throws InitializationException {
+ setupService(BLOB_NAME, CONTAINER);
+ }
+
+ private void setupService(String blobName, String container) throws
InitializationException {
+ final AzureCredentialsService credentialsService = new
StandardAzureCredentialsControllerService();
+
+ runner.addControllerService(CONTROLLER_SERVICE, credentialsService);
+ runner.enableControllerService(credentialsService);
+
+ runner.setProperty(service, BLOB_STORAGE_CREDENTIALS_SERVICE,
CONTROLLER_SERVICE);
+ runner.setProperty(service,
AzureBlobStorageFileResourceService.BLOB_NAME, blobName);
+ runner.setProperty(service,
AzureBlobStorageFileResourceService.CONTAINER, container);
+
+ runner.enableControllerService(service);
+ }
+
+ private void setupMocking(String container, String blobName) {
+
when(client.getBlobContainerClient(container)).thenReturn(containerClient);
+ when(containerClient.getBlobClient(blobName)).thenReturn(blobClient);
+ when(blobClient.exists()).thenReturn(true);
+ when(blobClient.getProperties()).thenReturn(blobProperties);
+ when(blobProperties.getBlobSize()).thenReturn(CONTENT_LENGTH);
+ when(blobClient.openInputStream()).thenReturn(blobInputStream);
+ }
+
+ private void assertFileResource(FileResource fileResource) {
+ assertNotNull(fileResource);
+ assertEquals(fileResource.getInputStream(), blobInputStream);
+ assertEquals(fileResource.getSize(), CONTENT_LENGTH);
+ }
+
+ private void verifyMockInvocations(String customContainer, String
customBlobName) {
+ verify(client).getBlobContainerClient(customContainer);
+ verify(containerClient).getBlobClient(customBlobName);
+ verify(blobClient).exists();
+ verify(blobClient).getProperties();
+ verify(blobProperties).getBlobSize();
+ verify(blobClient).openInputStream();
+ verifyNoMoreInteractions(containerClient, blobClient, blobProperties);
+ }
+
+ private static class TestAzureBlobStorageFileResourceService extends
AzureBlobStorageFileResourceService {
+
+ private final BlobServiceClient client;
+
+ public TestAzureBlobStorageFileResourceService(BlobServiceClient
client) {
+ this.client = client;
+ }
+
+ @Override
+ protected BlobServiceClient getStorageClient(Map<String, String>
attributes) {
+ return client;
+ }
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceServiceTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceServiceTest.java
new file mode 100644
index 0000000000..b879893875
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceServiceTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import
com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
+import com.azure.storage.file.datalake.models.PathProperties;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class AzureDataLakeStorageFileResourceServiceTest {
+ private static final String CREDENTIALS_CONTROLLER_SERVICE =
"ADLSCredentialsService";
+ private static final String FILE_SYSTEM = "filesystem-name";
+ private static final String DIRECTORY = "test-directory";
+ private static final String FILE = "test-file";
+ private static final long CONTENT_LENGTH = 10L;
+ public static final String MSG_EMPTY_FILE_NAME = "'File Name' property
evaluated to blank string. 'File Name' must be specified as a non-blank
string.";
+ public static final String MSG_EMPTY_FILE_SYSTEM_NAME = "'Filesystem Name'
property evaluated to blank string. 'Filesystem Name' must be specified as a
non-blank string.";
+
+ @Mock
+ private DataLakeServiceClient client;
+
+ @Mock
+ private DataLakeFileSystemClient fileSystemClient;
+
+ @Mock
+ private DataLakeDirectoryClient directoryClient;
+
+ @Mock
+ private DataLakeFileClient fileClient;
+
+ @Mock
+ private PathProperties properties;
+
+ @Mock
+ private InputStream inputStream;
+
+ @InjectMocks
+ private TestAzureDataLakeStorageFileResourceService service;
+
+ private TestRunner runner;
+
+ @BeforeEach
+ void setup() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService("AzureDataLakeStorageFileResourceService",
service);
+ }
+
+ @Test
+ void testHappyPath() throws InitializationException {
+ setupService();
+ setupMocking();
+
+ FileResource fileResource = service.getFileResource(Map.of());
+
+ assertFileResource(fileResource);
+ verifyMockInvocations();
+ }
+
+ @Test
+ void testHappyPathWithValidEL() throws InitializationException {
+ String fileSystemKey = "filesystem.name";
+ String directoryKey = "directory";
+ String fileNameKey = "filename";
+ setupService("${" + fileSystemKey + "}", "${" + directoryKey + "}",
"${" + fileNameKey + "}");
+ setupMocking();
+
+ FileResource fileResource = service.getFileResource(Map.of(
+ fileSystemKey, FILE_SYSTEM,
+ directoryKey, DIRECTORY,
+ fileNameKey, FILE));
+
+ assertFileResource(fileResource);
+ verifyMockInvocations();
+ }
+
+ @Test
+ void testFileIsDirectory() throws InitializationException {
+ setupService();
+
when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient);
+
when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient);
+ when(directoryClient.getFileClient(FILE)).thenReturn(fileClient);
+ when(fileClient.getProperties()).thenReturn(properties);
+ when(properties.isDirectory()).thenReturn(true);
+
+ executeAndAssertProcessException(Map.of(), "File Name (" + FILE + ")
points to a directory. Full path: " + fileClient.getFilePath());
+ }
+
+ @Test
+ void testNonExistentFile() throws InitializationException {
+ setupService();
+
when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient);
+
when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient);
+ when(directoryClient.getFileClient(FILE)).thenReturn(fileClient);
+ when(fileClient.getProperties()).thenReturn(properties);
+ when(properties.isDirectory()).thenReturn(false);
+ when(fileClient.exists()).thenReturn(false);
+
+ executeAndAssertProcessException(Map.of(), "File " + DIRECTORY + "/" +
FILE + " not found in file system: " + FILE_SYSTEM);
+ }
+
+ @Test
+ void testInvalidDirectoryValueWithLeadingSlash() throws
InitializationException {
+ String directoryKey = "directory.name";
+ String directoryValue = "/invalid-directory";
+ setupService(FILE_SYSTEM, "${" + directoryKey + "}", FILE);
+
+ executeAndAssertProcessException(Map.of(directoryKey, directoryValue),
"'Directory Name' starts with '/'. 'Directory Name' cannot contain a leading
'/'.");
+ }
+
+ @Test
+ void testValidELWithMissingFileValue() throws InitializationException {
+ setupService(FILE_SYSTEM, DIRECTORY, "${file.name}");
+
+ executeAndAssertProcessException(Map.of(), MSG_EMPTY_FILE_NAME);
+ }
+
+ @Test
+ void testInvalidFileSystem() throws InitializationException {
+ String fileSystemKey = "fileSystem";
+ String fileSystemValue = " ";
+ setupService("${" + fileSystemKey + "}", DIRECTORY, FILE);
+
+ executeAndAssertProcessException(Map.of(fileSystemKey,
fileSystemValue), MSG_EMPTY_FILE_SYSTEM_NAME);
+ }
+
+ @Test
+ void testInvalidFileName() throws InitializationException {
+ String fileKey = "fileSystem";
+ String fileValue = " ";
+ setupService(FILE_SYSTEM, DIRECTORY, "${" + fileKey + "}");
+
+ executeAndAssertProcessException(Map.of(fileKey, fileValue),
+ MSG_EMPTY_FILE_NAME);
+ }
+
+ @Test
+ void testInvalidDirectoryValueWithWhiteSpaceOnly() throws
InitializationException {
+ String directoryKey = "directory.name";
+ String directoryValue = " ";
+ setupService(FILE_SYSTEM, "${" + directoryKey + "}", FILE);
+
+ executeAndAssertProcessException(Map.of(directoryKey, directoryValue),
"'Directory Name' contains whitespace characters only.");
+ }
+
+ private void setupService() throws InitializationException {
+ setupService(FILE_SYSTEM, DIRECTORY, FILE);
+ }
+
+ private void setupService(String fileSystem, String directory, String
fileName) throws InitializationException {
+ final ADLSCredentialsService credentialsService =
mock(ADLSCredentialsService.class);
+
when(credentialsService.getIdentifier()).thenReturn(CREDENTIALS_CONTROLLER_SERVICE);
+ runner.addControllerService(CREDENTIALS_CONTROLLER_SERVICE,
credentialsService);
+ runner.enableControllerService(credentialsService);
+
+ runner.setProperty(service, ADLS_CREDENTIALS_SERVICE,
CREDENTIALS_CONTROLLER_SERVICE);
+ runner.setProperty(service, AzureStorageUtils.FILESYSTEM, fileSystem);
+ runner.setProperty(service, AzureStorageUtils.DIRECTORY, directory);
+ runner.setProperty(service, AzureStorageUtils.FILE, fileName);
+
+ runner.enableControllerService(service);
+ }
+
+ private void setupMocking() {
+
when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient);
+
when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient);
+ when(directoryClient.getFileClient(FILE)).thenReturn(fileClient);
+ when(fileClient.getProperties()).thenReturn(properties);
+ when(properties.isDirectory()).thenReturn(false);
+ when(fileClient.exists()).thenReturn(true);
+ when(properties.getFileSize()).thenReturn(CONTENT_LENGTH);
+ DataLakeFileOpenInputStreamResult result =
mock(DataLakeFileOpenInputStreamResult.class);
+ when(fileClient.openInputStream()).thenReturn(result);
+ when(result.getInputStream()).thenReturn(inputStream);
+ }
+
+ private void executeAndAssertProcessException(Map<String, String>
arguments, String expectedMessage) {
+ ProcessException exception = assertThrows(ProcessException.class,
+ () -> service.getFileResource(arguments));
+ assertEquals(expectedMessage, exception.getMessage());
+ }
+
+ private void assertFileResource(FileResource fileResource) {
+ assertNotNull(fileResource);
+ assertEquals(fileResource.getInputStream(), inputStream);
+ assertEquals(fileResource.getSize(), CONTENT_LENGTH);
+ }
+
+ private void verifyMockInvocations() {
+ verify(client).getFileSystemClient(FILE_SYSTEM);
+ verify(fileSystemClient).getDirectoryClient(DIRECTORY);
+ verify(directoryClient).getFileClient(FILE);
+ verify(properties).isDirectory();
+ verify(fileClient).exists();
+ verify(fileClient).openInputStream();
+ verify(properties).getFileSize();
+ }
+
+ private static class TestAzureDataLakeStorageFileResourceService extends
AzureDataLakeStorageFileResourceService {
+ private final DataLakeServiceClient client;
+
+ private
TestAzureDataLakeStorageFileResourceService(DataLakeServiceClient client) {
+ this.client = client;
+ }
+
+ @Override
+ protected DataLakeServiceClient getStorageClient(Map<String, String>
attributes) {
+ return client;
+ }
+ }
+}