This is an automated email from the ASF dual-hosted git repository.
jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 22e8901fce NIFI-8248 Modified PutAzureDataLakeStorage processor to use
temp file instead of inline replacement
22e8901fce is described below
commit 22e8901fced6f56d9913cec240458e3268449873
Author: Timea Barna <[email protected]>
AuthorDate: Mon Jun 27 07:50:29 2022 +0200
NIFI-8248 Modified PutAzureDataLakeStorage processor to use temp file
instead of inline replacement
This closes #6159
Signed-off-by: Joey Frazee <[email protected]>
---
.../AbstractAzureDataLakeStorageProcessor.java | 2 +
.../azure/storage/ListAzureDataLakeStorage.java | 13 ++
.../azure/storage/PutAzureDataLakeStorage.java | 103 +++++++---
.../additionalDetails.html | 24 +--
.../azure/storage/ITListAzureDataLakeStorage.java | 220 ++++++++++++++++++++-
.../azure/storage/ITPutAzureDataLakeStorage.java | 46 ++++-
6 files changed, 365 insertions(+), 43 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 3047de5f6e..1a66dda500 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -101,6 +101,8 @@ public abstract class AbstractAzureDataLakeStorageProcessor
extends AbstractProc
REL_FAILURE
)));
+ public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
+
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index cc43ffd567..24f51b5ae2 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -63,6 +63,7 @@ import static
org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_T
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
+import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY;
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
import static
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
@@ -129,6 +130,15 @@ public class ListAzureDataLakeStorage extends
AbstractListAzureProcessor<ADLSFil
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor INCLUDE_TEMPORARY_FILES = new
PropertyDescriptor.Builder()
+ .name("include-temporary-files")
+ .displayName("Include Temporary Files")
+ .description("Whether to include temporary files when listing the
contents of configured directory paths.")
+ .required(true)
+ .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+ .defaultValue(Boolean.FALSE.toString())
+ .build();
+
private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
@@ -136,6 +146,7 @@ public class ListAzureDataLakeStorage extends
AbstractListAzureProcessor<ADLSFil
RECURSE_SUBDIRECTORIES,
FILE_FILTER,
PATH_FILTER,
+ INCLUDE_TEMPORARY_FILES,
RECORD_WRITER,
LISTING_STRATEGY,
TRACKING_STATE_CACHE,
@@ -261,10 +272,12 @@ public class ListAzureDataLakeStorage extends
AbstractListAzureProcessor<ADLSFil
options.setRecursive(recurseSubdirectories);
final Pattern baseDirectoryPattern = Pattern.compile("^" +
baseDirectory + "/?");
+ final boolean includeTempFiles =
context.getProperty(INCLUDE_TEMPORARY_FILES).asBoolean();
final long minimumTimestamp = minTimestamp == null ? 0 :
minTimestamp;
final List<ADLSFileInfo> listing =
fileSystemClient.listPaths(options, null).stream()
.filter(pathItem -> !pathItem.isDirectory())
+ .filter(pathItem -> includeTempFiles ||
!pathItem.getName().contains(TEMP_FILE_DIRECTORY))
.filter(pathItem ->
isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp,
pathItem.getLastModified().toInstant().toEpochMilli(),
pathItem.getContentLength()))
.map(pathItem -> new ADLSFileInfo.Builder()
.fileSystem(fileSystem)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 3c10d068b0..dbd6138e15 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -20,6 +20,7 @@ import
com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -30,20 +31,24 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.StringUtils;
import java.io.BufferedInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
@@ -83,11 +88,23 @@ public class PutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcess
.allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION,
IGNORE_RESOLUTION)
.build();
+ public static final PropertyDescriptor BASE_TEMPORARY_PATH = new
PropertyDescriptor.Builder()
+ .name("base-temporary-path")
+ .displayName("Base Temporary Path")
+ .description("The Path where the temporary directory will be
created. The Path name cannot contain a leading '/'." +
+ " The root directory can be designated by the empty string
value. Non-existing directories will be created." +
+ "The Temporary File Directory name is " +
TEMP_FILE_DIRECTORY)
+ .defaultValue("")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(new DirectoryValidator("Base Temporary Path"))
+ .build();
+
private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
DIRECTORY,
FILE,
+ BASE_TEMPORARY_PATH,
CONFLICT_RESOLUTION,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@@ -107,41 +124,39 @@ public class PutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcess
final long startNanos = System.nanoTime();
try {
final String fileSystem = evaluateFileSystemProperty(context,
flowFile);
- final String directory = evaluateDirectoryProperty(context,
flowFile);
+ final String originalDirectory =
evaluateDirectoryProperty(context, flowFile);
+ final String tempPath = evaluateDirectoryProperty(context,
flowFile, BASE_TEMPORARY_PATH);
+ final String tempDirectory = createPath(tempPath,
TEMP_FILE_DIRECTORY);
final String fileName = evaluateFileNameProperty(context,
flowFile);
final DataLakeServiceClient storageClient =
getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient =
storageClient.getFileSystemClient(fileSystem);
- final DataLakeDirectoryClient directoryClient =
fileSystemClient.getDirectoryClient(directory);
- final DataLakeFileClient fileClient;
+ final DataLakeDirectoryClient directoryClient =
fileSystemClient.getDirectoryClient(originalDirectory);
+ final DataLakeFileClient tempFileClient;
+ final DataLakeFileClient renamedFileClient;
+ final String tempFilePrefix = UUID.randomUUID().toString();
+ final DataLakeDirectoryClient tempDirectoryClient =
fileSystemClient.getDirectoryClient(tempDirectory);
final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
try {
- fileClient = directoryClient.createFile(fileName, overwrite);
-
- final long length = flowFile.getSize();
- if (length > 0) {
- try (final InputStream rawIn = session.read(flowFile);
final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
- uploadContent(fileClient, bufferedIn, length);
- } catch (Exception e) {
- removeTempFile(fileClient);
- throw e;
- }
- }
+ tempFileClient = tempDirectoryClient.createFile(tempFilePrefix
+ fileName, true);
+ appendContent(flowFile, tempFileClient, session);
+ createDirectoryIfNotExists(directoryClient);
+ renamedFileClient = renameFile(fileName,
directoryClient.getDirectoryPath(), tempFileClient, overwrite);
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
- attributes.put(ATTR_NAME_DIRECTORY, directory);
+ attributes.put(ATTR_NAME_DIRECTORY, originalDirectory);
attributes.put(ATTR_NAME_FILENAME, fileName);
- attributes.put(ATTR_NAME_PRIMARY_URI, fileClient.getFileUrl());
- attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+ attributes.put(ATTR_NAME_PRIMARY_URI,
renamedFileClient.getFileUrl());
+ attributes.put(ATTR_NAME_LENGTH,
String.valueOf(flowFile.getSize()));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile,
fileClient.getFileUrl(), transferMillis);
+ session.getProvenanceReporter().send(flowFile,
renamedFileClient.getFileUrl(), transferMillis);
} catch (DataLakeStorageException dlsException) {
if (dlsException.getStatusCode() == 409) {
if (conflictResolution.equals(IGNORE_RESOLUTION)) {
@@ -164,14 +179,26 @@ public class PutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcess
}
}
- private void removeTempFile(DataLakeFileClient fileClient) {
- try {
- fileClient.delete();
- } catch (Exception e) {
- getLogger().error("Error while removing temp file on Azure Data
Lake Storage", e);
+ private void createDirectoryIfNotExists(DataLakeDirectoryClient
directoryClient) {
+ if (!directoryClient.getDirectoryPath().isEmpty() &&
!directoryClient.exists()) {
+ directoryClient.create();
+ }
+ }
+
+ //Visible for testing
+ void appendContent(FlowFile flowFile, DataLakeFileClient fileClient,
ProcessSession session) throws IOException {
+ final long length = flowFile.getSize();
+ if (length > 0) {
+ try (final InputStream rawIn = session.read(flowFile); final
BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
+ uploadContent(fileClient, bufferedIn, length);
+ } catch (Exception e) {
+ removeTempFile(fileClient);
+ throw e;
+ }
}
}
+ //Visible for testing
static void uploadContent(DataLakeFileClient fileClient, InputStream in,
long length) {
long chunkStart = 0;
long chunkSize;
@@ -190,4 +217,34 @@ public class PutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcess
fileClient.flush(length);
}
+
+ //Visible for testing
+ DataLakeFileClient renameFile(final String fileName, final String
directoryPath, final DataLakeFileClient fileClient, final boolean overwrite) {
+ try {
+ final DataLakeRequestConditions destinationCondition = new
DataLakeRequestConditions();
+ if (!overwrite) {
+ destinationCondition.setIfNoneMatch("*");
+ }
+ final String destinationPath = createPath(directoryPath, fileName);
+ return fileClient.renameWithResponse(null, destinationPath, null,
destinationCondition, null, null).getValue();
+ } catch (DataLakeStorageException dataLakeStorageException) {
+ getLogger().error("Renaming File [{}] failed",
fileClient.getFileName(), dataLakeStorageException);
+ removeTempFile(fileClient);
+ throw dataLakeStorageException;
+ }
+ }
+
+ private String createPath(final String baseDirectory, final String path) {
+ return StringUtils.isNotBlank(baseDirectory)
+ ? baseDirectory + "/" + path
+ : path;
+ }
+
+ private void removeTempFile(final DataLakeFileClient fileClient) {
+ try {
+ fileClient.delete();
+ } catch (Exception e) {
+ getLogger().error("Renaming File [{}] failed",
fileClient.getFileName(), e);
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
index 40e78d157c..2469ceafaa 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
@@ -28,27 +28,29 @@
<h3>File uploading and cleanup process</h3>
-<h4>New file</h4>
+<h4>New file upload</h4>
<ol>
- <li>An empty file is created.</li>
- <li>Content is appended to file.</li>
- <li>In case append failure the file is deleted.</li>
- <li>In case file deletion failure the empty file remains on the
server.</li>
+ <li>A temporary file is created with random prefix under the given path in
'_nifitempdirectory'.</li>
+ <li>Content is appended to temp file.</li>
+ <li>Temp file is renamed to its original name, the original file is
overwritten.</li>
+ <li>In case of appending or renaming failure the temp file is deleted, the
original file remains intact.</li>
+ <li>In case of temporary file deletion failure both temp file and original
file remain on the server.</li>
</ol>
-<h4>Existing file</h4>
+<h4>Existing file upload</h4>
<ul>
<li>Processors with "fail" conflict resolution strategy will be directed
to "Failure" relationship.</li>
- <li>Processors with "ignore" conflict resolution strategy will be
directed to "Success" relationship.</li>
+ <li>Processors with "ignore" conflict resolution strategy will be directed
to "Success" relationship.</li>
<li>Processors with "replace" conflict resolution strategy:</li>
<ol>
- <li>An empty file overwrites the existing file, the original file is
lost.</li>
- <li>Content is appended to file.</li>
- <li>In case append failure the file is deleted.</li>
- <li>In case file deletion failure the empty file remains on the
server.</li>
+ <li>A temporary file is created with random prefix under the given
path in '_nifitempdirectory'.</li>
+ <li>Content is appended to temp file.</li>
+ <li>Temp file is renamed to its original name, the original file is
overwritten.</li>
+ <li>In case of appending or renaming failure the temp file is deleted,
the original file remains intact.</li>
+ <li>In case of temporary file deletion failure both temp file and
original file remain on the server.</li>
</ol>
</ul>
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
index 6c45663e33..978fe9433f 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
@@ -37,6 +37,7 @@ import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -57,6 +58,10 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
uploadFile(testFile1);
testFiles.put(testFile1.getFilePath(), testFile1);
+ TestFile testTempFile1 = new
TestFile(AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY,
"1234file1");
+ uploadFile(testTempFile1);
+ testFiles.put(testTempFile1.getFilePath(), testTempFile1);
+
TestFile testFile2 = new TestFile("", "file2");
uploadFile(testFile2);
testFiles.put(testFile2.getFilePath(), testFile2);
@@ -65,6 +70,10 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
createDirectoryAndUploadFile(testFile11);
testFiles.put(testFile11.getFilePath(), testFile11);
+ TestFile testTempFile11 = new TestFile(String.format("dir1/%s",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "5678file11");
+ uploadFile(testTempFile11);
+ testFiles.put(testTempFile11.getFilePath(), testTempFile11);
+
TestFile testFile12 = new TestFile("dir1", "file12");
uploadFile(testFile12);
testFiles.put(testFile12.getFilePath(), testFile12);
@@ -73,10 +82,18 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
createDirectoryAndUploadFile(testFile111);
testFiles.put(testFile111.getFilePath(), testFile111);
+ TestFile testTempFile111 = new TestFile(String.format("dir1/dir11/%s",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "9010file111");
+ uploadFile(testTempFile111);
+ testFiles.put(testTempFile111.getFilePath(), testTempFile111);
+
TestFile testFile21 = new TestFile("dir 2", "file 21", "Test");
createDirectoryAndUploadFile(testFile21);
testFiles.put(testFile21.getFilePath(), testFile21);
+ TestFile testTempFile21 = new TestFile(String.format("dir2/%s",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "1112file21",
"Test");
+ uploadFile(testTempFile21);
+ testFiles.put(testTempFile21.getFilePath(), testTempFile21);
+
createDirectory("dir3");
}
@@ -89,6 +106,20 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12",
"dir1/dir11/file111", "dir 2/file 21");
}
+ @Test
+ public void testListRootRecursiveWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("file1", "file2", "dir1/file11", "dir1/file12",
"dir1/dir11/file111", "dir 2/file 21",
+ String.format("%s/1234file1",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir2/%s/1112file21",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListRootRecursiveUsingProxyConfigurationService() throws
Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
@@ -109,6 +140,17 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2");
}
+ @Test
+ public void testListRootNonRecursiveWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES,
"false");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("file1", "file2");
+ }
+
@Test
public void testListSubdirectoryRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
@@ -118,6 +160,18 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
+ @Test
+ public void testListSubdirectoryRecursiveWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
+ String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListSubdirectoryNonRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
@@ -128,20 +182,45 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/file12");
}
+ @Test
+ public void testListSubdirectoryNonRecursiveWithTempFiles() throws
Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES,
"false");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/file12");
+ }
+
@Test
public void testListWithFileFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
- runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file1.*$");
+ runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12",
"dir1/dir11/file111");
}
+ @Test
+ public void testListWithFileFilterWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("file1", "dir1/file11", "dir1/file12",
"dir1/dir11/file111",
+ String.format("%s/1234file1",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListWithFileFilterWithEL() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
- runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER,
"^file${suffix}$");
+ runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER,
".*file${suffix}$");
runner.setVariable("suffix", "1.*");
runProcessor();
@@ -149,6 +228,21 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "dir1/file11", "dir1/file12",
"dir1/dir11/file111");
}
+ @Test
+ public void testListWithFileFilterWithELWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER,
".*file${suffix}$");
+ runner.setVariable("suffix", "1.*");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("file1", "dir1/file11", "dir1/file12",
"dir1/dir11/file111",
+ String.format("%s/1234file1",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListRootWithPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
@@ -159,6 +253,19 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
+ @Test
+ public void testListRootWithPathFilterWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
+ String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListRootWithPathFilterWithEL() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
@@ -171,6 +278,21 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
+ @Test
+ public void testListRootWithPathFilterWithELWithTempFiles() throws
Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER,
"${prefix}${suffix}");
+ runner.setVariable("prefix", "^dir");
+ runner.setVariable("suffix", "1.*$");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
+ String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListSubdirectoryWithPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
@@ -181,6 +303,17 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/dir11/file111");
}
+ @Test
+ public void testListSubdirectoryWithPathFilterWithTempFiles() throws
Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+ runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("dir1/dir11/file111",
String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListRootWithFileAndPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
@@ -192,6 +325,19 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/dir11/file111");
}
+ @Test
+ public void testListRootWithFileAndPathFilterWithTempFiles() throws
Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
+ runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/dir11/file111",
String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListEmptyDirectory() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir3");
@@ -236,6 +382,24 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
flowFile.assertAttributeEquals("record.count", "3");
}
+ @Test
+ public void testListWithRecordsWithTempFiles() throws
InitializationException {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"dir1");
+
+ MockRecordWriter recordWriter = new MockRecordWriter(null, false);
+ runner.addControllerService("record-writer", recordWriter);
+ runner.enableControllerService(recordWriter);
+ runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER,
"record-writer");
+
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runner.run();
+
+
runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "5");
+ }
+
@Test
public void testListWithMinAge() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
@@ -246,6 +410,17 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
}
+ @Test
+ public void testListWithMinAgeWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
+ }
+
@Test
public void testListWithMaxAge() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
@@ -256,6 +431,21 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12",
"dir1/dir11/file111", "dir 2/file 21");
}
+ @Test
+ public void testListWithMaxAgeWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("file1", "file2", "dir1/file11", "dir1/file12",
"dir1/dir11/file111", "dir 2/file 21",
+ String.format("%s/1234file1",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir2/%s/1112file21",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListWithMinSize() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
@@ -266,6 +456,20 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12",
"dir1/dir11/file111");
}
+ @Test
+ public void testListWithMinSizeWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("file1", "file2", "dir1/file11", "dir1/file12",
"dir1/dir11/file111",
+ String.format("%s/1234file1",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/%s/5678file11",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+ String.format("dir1/dir11/%s/9010file111",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
@Test
public void testListWithMaxSize() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
@@ -276,6 +480,17 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
assertSuccess("dir 2/file 21");
}
+ @Test
+ public void testListWithMaxSizeWithTempFiles() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY,
"");
+ runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
+ runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES,
"true");
+
+ runProcessor();
+
+ assertSuccess("dir 2/file 21", String.format("dir2/%s/1112file21",
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+ }
+
private void runProcessor() {
runner.assertValid();
runner.run();
@@ -288,6 +503,7 @@ public class ITListAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths));
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS);
+ assertEquals(expectedFiles.size(), flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
String filePath = flowFile.getAttribute("azure.filePath");
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 20efbb4372..f249b38a9e 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -16,12 +16,19 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.core.http.rest.Response;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -44,9 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@@ -255,15 +265,37 @@ public class ITPutAzureDataLakeStorage extends
AbstractAzureDataLakeStorageIT {
@Test
public void testPutFileButFailedToAppend() {
- DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
- InputStream stream = mock(InputStream.class);
-
doThrow(NullPointerException.class).when(fileClient).append(any(InputStream.class),
anyLong(), anyLong());
+ final PutAzureDataLakeStorage processor = new
PutAzureDataLakeStorage();
+ final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
+ final ProcessSession session = mock(ProcessSession.class);
+ final FlowFile flowFile = mock(FlowFile.class);
- assertThrows(NullPointerException.class, () -> {
- PutAzureDataLakeStorage.uploadContent(fileClient, stream,
FILE_DATA.length);
+ when(flowFile.getSize()).thenReturn(1L);
+
doThrow(IllegalArgumentException.class).when(fileClient).append(any(InputStream.class),
anyLong(), anyLong());
- verify(fileClient).delete();
- });
+ assertThrows(IllegalArgumentException.class, () ->
processor.appendContent(flowFile, fileClient, session));
+ verify(fileClient).delete();
+ }
+
+ @Test
+ public void testPutFileButFailedToRename() {
+ final PutAzureDataLakeStorage processor = new
PutAzureDataLakeStorage();
+ final ProcessorInitializationContext initContext =
mock(ProcessorInitializationContext.class);
+ final String componentId = "componentId";
+ final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
+ final Response<DataLakeFileClient> response = mock(Response.class);
+ //Mock logger
+ when(initContext.getIdentifier()).thenReturn(componentId);
+ MockComponentLog componentLog = new MockComponentLog(componentId,
processor);
+ when(initContext.getLogger()).thenReturn(componentLog);
+ processor.initialize(initContext);
+ //Mock renameWithResponse Azure method
+ when(fileClient.renameWithResponse(isNull(), anyString(), isNull(),
any(DataLakeRequestConditions.class), isNull(), isNull())).thenReturn(response);
+ when(response.getValue()).thenThrow(DataLakeStorageException.class);
+ when(fileClient.getFileName()).thenReturn(FILE_NAME);
+
+ assertThrows(DataLakeStorageException.class, () ->
processor.renameFile(FILE_NAME, "", fileClient, false));
+ verify(fileClient).delete();
}
private Map<String, String> createAttributesMap() {