This is an automated email from the ASF dual-hosted git repository.

jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 22e8901fce NIFI-8248 Modified PutAzureDataLakeStorage processor to use 
temp file instead of inline replacement
22e8901fce is described below

commit 22e8901fced6f56d9913cec240458e3268449873
Author: Timea Barna <[email protected]>
AuthorDate: Mon Jun 27 07:50:29 2022 +0200

    NIFI-8248 Modified PutAzureDataLakeStorage processor to use temp file 
instead of inline replacement
    
    This closes #6159
    
    Signed-off-by: Joey Frazee <[email protected]>
---
 .../AbstractAzureDataLakeStorageProcessor.java     |   2 +
 .../azure/storage/ListAzureDataLakeStorage.java    |  13 ++
 .../azure/storage/PutAzureDataLakeStorage.java     | 103 +++++++---
 .../additionalDetails.html                         |  24 +--
 .../azure/storage/ITListAzureDataLakeStorage.java  | 220 ++++++++++++++++++++-
 .../azure/storage/ITPutAzureDataLakeStorage.java   |  46 ++++-
 6 files changed, 365 insertions(+), 43 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 3047de5f6e..1a66dda500 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -101,6 +101,8 @@ public abstract class AbstractAzureDataLakeStorageProcessor 
extends AbstractProc
             REL_FAILURE
     )));
 
+    public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
+
     @Override
     public Set<Relationship> getRelationships() {
         return RELATIONSHIPS;
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index cc43ffd567..24f51b5ae2 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -63,6 +63,7 @@ import static 
org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_T
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
+import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
@@ -129,6 +130,15 @@ public class ListAzureDataLakeStorage extends 
AbstractListAzureProcessor<ADLSFil
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor INCLUDE_TEMPORARY_FILES = new 
PropertyDescriptor.Builder()
+            .name("include-temporary-files")
+            .displayName("Include Temporary Files")
+            .description("Whether to include temporary files when listing the 
contents of configured directory paths.")
+            .required(true)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+
     private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
             ADLS_CREDENTIALS_SERVICE,
             FILESYSTEM,
@@ -136,6 +146,7 @@ public class ListAzureDataLakeStorage extends 
AbstractListAzureProcessor<ADLSFil
             RECURSE_SUBDIRECTORIES,
             FILE_FILTER,
             PATH_FILTER,
+            INCLUDE_TEMPORARY_FILES,
             RECORD_WRITER,
             LISTING_STRATEGY,
             TRACKING_STATE_CACHE,
@@ -261,10 +272,12 @@ public class ListAzureDataLakeStorage extends 
AbstractListAzureProcessor<ADLSFil
             options.setRecursive(recurseSubdirectories);
 
             final Pattern baseDirectoryPattern = Pattern.compile("^" + 
baseDirectory + "/?");
+            final boolean includeTempFiles = 
context.getProperty(INCLUDE_TEMPORARY_FILES).asBoolean();
             final long minimumTimestamp = minTimestamp == null ? 0 : 
minTimestamp;
 
             final List<ADLSFileInfo> listing = 
fileSystemClient.listPaths(options, null).stream()
                     .filter(pathItem -> !pathItem.isDirectory())
+                    .filter(pathItem -> includeTempFiles || 
!pathItem.getName().contains(TEMP_FILE_DIRECTORY))
                     .filter(pathItem -> 
isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, 
pathItem.getLastModified().toInstant().toEpochMilli(), 
pathItem.getContentLength()))
                     .map(pathItem -> new ADLSFileInfo.Builder()
                             .fileSystem(fileSystem)
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 3c10d068b0..dbd6138e15 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -20,6 +20,7 @@ import 
com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
 import com.azure.storage.file.datalake.models.DataLakeStorageException;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -30,20 +31,24 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.BufferedInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
@@ -83,11 +88,23 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
             .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, 
IGNORE_RESOLUTION)
             .build();
 
+    public static final PropertyDescriptor BASE_TEMPORARY_PATH = new 
PropertyDescriptor.Builder()
+            .name("base-temporary-path")
+            .displayName("Base Temporary Path")
+            .description("The Path where the temporary directory will be 
created. The Path name cannot contain a leading '/'." +
+                    " The root directory can be designated by the empty string 
value. Non-existing directories will be created." +
+                    "The Temporary File Directory name is " + 
TEMP_FILE_DIRECTORY)
+            .defaultValue("")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(new DirectoryValidator("Base Temporary Path"))
+            .build();
+
     private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
             ADLS_CREDENTIALS_SERVICE,
             FILESYSTEM,
             DIRECTORY,
             FILE,
+            BASE_TEMPORARY_PATH,
             CONFLICT_RESOLUTION,
             AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
     ));
@@ -107,41 +124,39 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
         final long startNanos = System.nanoTime();
         try {
             final String fileSystem = evaluateFileSystemProperty(context, 
flowFile);
-            final String directory = evaluateDirectoryProperty(context, 
flowFile);
+            final String originalDirectory = 
evaluateDirectoryProperty(context, flowFile);
+            final String tempPath = evaluateDirectoryProperty(context, 
flowFile, BASE_TEMPORARY_PATH);
+            final String tempDirectory = createPath(tempPath, 
TEMP_FILE_DIRECTORY);
             final String fileName = evaluateFileNameProperty(context, 
flowFile);
 
             final DataLakeServiceClient storageClient = 
getStorageClient(context, flowFile);
             final DataLakeFileSystemClient fileSystemClient = 
storageClient.getFileSystemClient(fileSystem);
-            final DataLakeDirectoryClient directoryClient = 
fileSystemClient.getDirectoryClient(directory);
-            final DataLakeFileClient fileClient;
+            final DataLakeDirectoryClient directoryClient = 
fileSystemClient.getDirectoryClient(originalDirectory);
+            final DataLakeFileClient tempFileClient;
+            final DataLakeFileClient renamedFileClient;
 
+            final String tempFilePrefix = UUID.randomUUID().toString();
+            final DataLakeDirectoryClient tempDirectoryClient = 
fileSystemClient.getDirectoryClient(tempDirectory);
             final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
             boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
 
             try {
-                fileClient = directoryClient.createFile(fileName, overwrite);
-
-                final long length = flowFile.getSize();
-                if (length > 0) {
-                    try (final InputStream rawIn = session.read(flowFile); 
final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                        uploadContent(fileClient, bufferedIn, length);
-                    } catch (Exception e) {
-                        removeTempFile(fileClient);
-                        throw e;
-                    }
-                }
+                tempFileClient = tempDirectoryClient.createFile(tempFilePrefix 
+ fileName, true);
+                appendContent(flowFile, tempFileClient, session);
+                createDirectoryIfNotExists(directoryClient);
+                renamedFileClient = renameFile(fileName, 
directoryClient.getDirectoryPath(), tempFileClient, overwrite);
 
                 final Map<String, String> attributes = new HashMap<>();
                 attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
-                attributes.put(ATTR_NAME_DIRECTORY, directory);
+                attributes.put(ATTR_NAME_DIRECTORY, originalDirectory);
                 attributes.put(ATTR_NAME_FILENAME, fileName);
-                attributes.put(ATTR_NAME_PRIMARY_URI, fileClient.getFileUrl());
-                attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                attributes.put(ATTR_NAME_PRIMARY_URI, 
renamedFileClient.getFileUrl());
+                attributes.put(ATTR_NAME_LENGTH, 
String.valueOf(flowFile.getSize()));
                 flowFile = session.putAllAttributes(flowFile, attributes);
 
                 session.transfer(flowFile, REL_SUCCESS);
                 final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                session.getProvenanceReporter().send(flowFile, 
fileClient.getFileUrl(), transferMillis);
+                session.getProvenanceReporter().send(flowFile, 
renamedFileClient.getFileUrl(), transferMillis);
             } catch (DataLakeStorageException dlsException) {
                 if (dlsException.getStatusCode() == 409) {
                     if (conflictResolution.equals(IGNORE_RESOLUTION)) {
@@ -164,14 +179,26 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
         }
     }
 
-    private void removeTempFile(DataLakeFileClient fileClient) {
-        try {
-            fileClient.delete();
-        } catch (Exception e) {
-            getLogger().error("Error while removing temp file on Azure Data 
Lake Storage", e);
+    private void createDirectoryIfNotExists(DataLakeDirectoryClient 
directoryClient) {
+        if (!directoryClient.getDirectoryPath().isEmpty() && 
!directoryClient.exists()) {
+            directoryClient.create();
+        }
+    }
+
+   //Visible for testing
+    void appendContent(FlowFile flowFile, DataLakeFileClient fileClient, 
ProcessSession session) throws IOException {
+        final long length = flowFile.getSize();
+        if (length > 0) {
+            try (final InputStream rawIn = session.read(flowFile); final 
BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
+                uploadContent(fileClient, bufferedIn, length);
+            } catch (Exception e) {
+                removeTempFile(fileClient);
+                throw e;
+            }
         }
     }
 
+    //Visible for testing
     static void uploadContent(DataLakeFileClient fileClient, InputStream in, 
long length) {
         long chunkStart = 0;
         long chunkSize;
@@ -190,4 +217,34 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
 
         fileClient.flush(length);
     }
+
+    //Visible for testing
+    DataLakeFileClient renameFile(final String fileName, final String 
directoryPath, final DataLakeFileClient fileClient, final boolean overwrite) {
+        try {
+            final DataLakeRequestConditions destinationCondition = new 
DataLakeRequestConditions();
+            if (!overwrite) {
+                destinationCondition.setIfNoneMatch("*");
+            }
+            final String destinationPath = createPath(directoryPath, fileName);
+            return fileClient.renameWithResponse(null, destinationPath, null, 
destinationCondition, null, null).getValue();
+        } catch (DataLakeStorageException dataLakeStorageException) {
+            getLogger().error("Renaming File [{}] failed", 
fileClient.getFileName(), dataLakeStorageException);
+            removeTempFile(fileClient);
+            throw dataLakeStorageException;
+        }
+    }
+
+    private String createPath(final String baseDirectory, final String path) {
+        return StringUtils.isNotBlank(baseDirectory)
+                ? baseDirectory + "/" + path
+                : path;
+    }
+
+    private void removeTempFile(final DataLakeFileClient fileClient) {
+        try {
+            fileClient.delete();
+        } catch (Exception e) {
+            getLogger().error("Renaming File [{}] failed", 
fileClient.getFileName(), e);
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
index 40e78d157c..2469ceafaa 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
@@ -28,27 +28,29 @@
 
 <h3>File uploading and cleanup process</h3>
 
-<h4>New file</h4>
+<h4>New file upload</h4>
 
 <ol>
-    <li>An empty file is created.</li>
-    <li>Content is appended to file.</li>
-    <li>In case append failure the file is deleted.</li>
-    <li>In case file deletion failure the empty file remains on the 
server.</li>
+    <li>A temporary file is created with random prefix under the given path in 
'_nifitempdirectory'.</li>
+    <li>Content is appended to temp file.</li>
+    <li>Temp file is renamed to its original name, the original file is 
overwritten.</li>
+    <li>In case of appending or renaming failure the temp file is deleted, the 
original file remains intact.</li>
+    <li>In case of temporary file deletion failure both temp file and original 
file remain on the server.</li>
 </ol>
 
-<h4>Existing file</h4>
+<h4>Existing file upload</h4>
 
 <ul>
     <li>Processors with "fail" conflict resolution strategy will be directed 
to "Failure" relationship.</li>
-     <li>Processors with "ignore" conflict resolution strategy will be 
directed to "Success" relationship.</li>
+    <li>Processors with "ignore" conflict resolution strategy will be directed 
to "Success" relationship.</li>
     <li>Processors with "replace" conflict resolution strategy:</li>
 
     <ol>
-        <li>An empty file overwrites the existing file, the original file is 
lost.</li>
-        <li>Content is appended to file.</li>
-        <li>In case append failure the file is deleted.</li>
-        <li>In case file deletion failure the empty file remains on the 
server.</li>
+        <li>A temporary file is created with random prefix under the given 
path in '_nifitempdirectory'.</li>
+        <li>Content is appended to temp file.</li>
+        <li>Temp file is renamed to its original name, the original file is 
overwritten.</li>
+        <li>In case of appending or renaming failure the temp file is deleted, 
the original file remains intact.</li>
+        <li>In case of temporary file deletion failure both temp file and 
original file remain on the server.</li>
     </ol>
 </ul>
 
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
index 6c45663e33..978fe9433f 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
@@ -37,6 +37,7 @@ import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -57,6 +58,10 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         uploadFile(testFile1);
         testFiles.put(testFile1.getFilePath(), testFile1);
 
+        TestFile testTempFile1 = new 
TestFile(AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY, 
"1234file1");
+        uploadFile(testTempFile1);
+        testFiles.put(testTempFile1.getFilePath(), testTempFile1);
+
         TestFile testFile2 = new TestFile("", "file2");
         uploadFile(testFile2);
         testFiles.put(testFile2.getFilePath(), testFile2);
@@ -65,6 +70,10 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         createDirectoryAndUploadFile(testFile11);
         testFiles.put(testFile11.getFilePath(), testFile11);
 
+        TestFile testTempFile11 = new TestFile(String.format("dir1/%s", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "5678file11");
+        uploadFile(testTempFile11);
+        testFiles.put(testTempFile11.getFilePath(), testTempFile11);
+
         TestFile testFile12 = new TestFile("dir1", "file12");
         uploadFile(testFile12);
         testFiles.put(testFile12.getFilePath(), testFile12);
@@ -73,10 +82,18 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         createDirectoryAndUploadFile(testFile111);
         testFiles.put(testFile111.getFilePath(), testFile111);
 
+        TestFile testTempFile111 = new TestFile(String.format("dir1/dir11/%s", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "9010file111");
+        uploadFile(testTempFile111);
+        testFiles.put(testTempFile111.getFilePath(), testTempFile111);
+
         TestFile testFile21 = new TestFile("dir 2", "file 21", "Test");
         createDirectoryAndUploadFile(testFile21);
         testFiles.put(testFile21.getFilePath(), testFile21);
 
+        TestFile testTempFile21 = new TestFile(String.format("dir2/%s", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "1112file21", 
"Test");
+        uploadFile(testTempFile21);
+        testFiles.put(testTempFile21.getFilePath(), testTempFile21);
+
         createDirectory("dir3");
     }
 
@@ -89,6 +106,20 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111", "dir 2/file 21");
     }
 
+    @Test
+    public void testListRootRecursiveWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111", "dir 2/file 21",
+                String.format("%s/1234file1", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir2/%s/1112file21", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListRootRecursiveUsingProxyConfigurationService() throws 
Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
@@ -109,6 +140,17 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("file1", "file2");
     }
 
+    @Test
+    public void testListRootNonRecursiveWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, 
"false");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("file1", "file2");
+    }
+
     @Test
     public void testListSubdirectoryRecursive() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
@@ -118,6 +160,18 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
     }
 
+    @Test
+    public void testListSubdirectoryRecursiveWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
+                String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListSubdirectoryNonRecursive() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
@@ -128,20 +182,45 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("dir1/file11", "dir1/file12");
     }
 
+    @Test
+    public void testListSubdirectoryNonRecursiveWithTempFiles() throws 
Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
+        runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, 
"false");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/file12");
+    }
+
     @Test
     public void testListWithFileFilter() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
-        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file1.*$");
+        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
 
         runProcessor();
 
         assertSuccess("file1", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111");
     }
 
+    @Test
+    public void testListWithFileFilterWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("file1", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111",
+                String.format("%s/1234file1", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListWithFileFilterWithEL() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
-        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, 
"^file${suffix}$");
+        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, 
".*file${suffix}$");
         runner.setVariable("suffix", "1.*");
 
         runProcessor();
@@ -149,6 +228,21 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("file1", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111");
     }
 
+    @Test
+    public void testListWithFileFilterWithELWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, 
".*file${suffix}$");
+        runner.setVariable("suffix", "1.*");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("file1", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111",
+                String.format("%s/1234file1", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListRootWithPathFilter() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
@@ -159,6 +253,19 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
     }
 
+    @Test
+    public void testListRootWithPathFilterWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
+                String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListRootWithPathFilterWithEL() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
@@ -171,6 +278,21 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
     }
 
+    @Test
+    public void testListRootWithPathFilterWithELWithTempFiles() throws 
Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, 
"${prefix}${suffix}");
+        runner.setVariable("prefix", "^dir");
+        runner.setVariable("suffix", "1.*$");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
+                String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListSubdirectoryWithPathFilter() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
@@ -181,6 +303,17 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("dir1/dir11/file111");
     }
 
+    @Test
+    public void testListSubdirectoryWithPathFilterWithTempFiles() throws 
Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
+        runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("dir1/dir11/file111", 
String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListRootWithFileAndPathFilter() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
@@ -192,6 +325,19 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("dir1/file11", "dir1/dir11/file111");
     }
 
+    @Test
+    public void testListRootWithFileAndPathFilterWithTempFiles() throws 
Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
+        runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/dir11/file111", 
String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListEmptyDirectory() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir3");
@@ -236,6 +382,24 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         flowFile.assertAttributeEquals("record.count", "3");
     }
 
+    @Test
+    public void testListWithRecordsWithTempFiles() throws 
InitializationException {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, false);
+        runner.addControllerService("record-writer", recordWriter);
+        runner.enableControllerService(recordWriter);
+        runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, 
"record-writer");
+
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "5");
+    }
+
     @Test
     public void testListWithMinAge() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
@@ -246,6 +410,17 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
     }
 
+    @Test
+    public void testListWithMinAgeWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
+    }
+
     @Test
     public void testListWithMaxAge() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
@@ -256,6 +431,21 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111", "dir 2/file 21");
     }
 
+    @Test
+    public void testListWithMaxAgeWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111", "dir 2/file 21",
+                String.format("%s/1234file1", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir2/%s/1112file21", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListWithMinSize() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
@@ -266,6 +456,20 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111");
     }
 
+    @Test
+    public void testListWithMinSizeWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", 
"dir1/dir11/file111",
+                String.format("%s/1234file1", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/%s/5678file11", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
+                String.format("dir1/dir11/%s/9010file111", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     @Test
     public void testListWithMaxSize() throws Exception {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
@@ -276,6 +480,17 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess("dir 2/file 21");
     }
 
+    @Test
+    public void testListWithMaxSizeWithTempFiles() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
+        runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
+        runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, 
"true");
+
+        runProcessor();
+
+        assertSuccess("dir 2/file 21", String.format("dir2/%s/1112file21", 
AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
+    }
+
     private void runProcessor() {
         runner.assertValid();
         runner.run();
@@ -288,6 +503,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths));
 
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS);
+        assertEquals(expectedFiles.size(), flowFiles.size());
 
         for (MockFlowFile flowFile : flowFiles) {
             String filePath = flowFile.getAttribute("azure.filePath");
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 20efbb4372..f249b38a9e 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -16,12 +16,19 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.http.rest.Response;
 import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -44,9 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
 
@@ -255,15 +265,37 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
 
     @Test
     public void testPutFileButFailedToAppend() {
-        DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
-        InputStream stream = mock(InputStream.class);
-        
doThrow(NullPointerException.class).when(fileClient).append(any(InputStream.class),
 anyLong(), anyLong());
+        final PutAzureDataLakeStorage processor = new 
PutAzureDataLakeStorage();
+        final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
+        final ProcessSession session = mock(ProcessSession.class);
+        final FlowFile flowFile = mock(FlowFile.class);
 
-        assertThrows(NullPointerException.class, () -> {
-            PutAzureDataLakeStorage.uploadContent(fileClient, stream, 
FILE_DATA.length);
+        when(flowFile.getSize()).thenReturn(1L);
+        
doThrow(IllegalArgumentException.class).when(fileClient).append(any(InputStream.class),
 anyLong(), anyLong());
 
-            verify(fileClient).delete();
-        });
+        assertThrows(IllegalArgumentException.class, () -> 
processor.appendContent(flowFile, fileClient, session));
+        verify(fileClient).delete();
+    }
+
+    @Test
+    public void testPutFileButFailedToRename() {
+        final PutAzureDataLakeStorage processor = new 
PutAzureDataLakeStorage();
+        final ProcessorInitializationContext initContext = 
mock(ProcessorInitializationContext.class);
+        final String componentId = "componentId";
+        final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
+        final Response<DataLakeFileClient> response = mock(Response.class);
+        //Mock logger
+        when(initContext.getIdentifier()).thenReturn(componentId);
+        MockComponentLog componentLog = new MockComponentLog(componentId, 
processor);
+        when(initContext.getLogger()).thenReturn(componentLog);
+        processor.initialize(initContext);
+        //Mock renameWithResponse Azure method
+        when(fileClient.renameWithResponse(isNull(), anyString(), isNull(), 
any(DataLakeRequestConditions.class), isNull(), isNull())).thenReturn(response);
+        when(response.getValue()).thenThrow(DataLakeStorageException.class);
+        when(fileClient.getFileName()).thenReturn(FILE_NAME);
+
+        assertThrows(DataLakeStorageException.class, () -> 
processor.renameFile(FILE_NAME, "", fileClient, false));
+        verify(fileClient).delete();
     }
 
     private Map<String, String> createAttributesMap() {

Reply via email to