This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new eb8d4ee06f NIFI-13930 PutAzureDataLakeStorage sets close flag on file 
write so that Azure can emit FlushWithClose event
eb8d4ee06f is described below

commit eb8d4ee06f92f5ac9671040277177204e4f72be0
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Sat Oct 26 21:51:37 2024 +0200

    NIFI-13930 PutAzureDataLakeStorage sets close flag on file write so that 
Azure can emit FlushWithClose event
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9451.
---
 .../azure/storage/PutAzureDataLakeStorage.java     | 28 ++++++++++------------
 1 file changed, 13 insertions(+), 15 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 5552fa3af1..8475e4bcec 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -16,12 +16,14 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.util.Context;
 import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
 import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -225,21 +227,18 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
 
     private void uploadFile(final ProcessSession session, final FlowFile 
flowFile, final Optional<FileResource> fileResourceFound,
                             final long transferSize, final DataLakeFileClient 
fileClient) throws Exception {
-        if (transferSize > 0) {
-            try (final InputStream inputStream = new BufferedInputStream(
-                    fileResourceFound.map(FileResource::getInputStream)
-                            .orElseGet(() -> session.read(flowFile)))
-            ) {
-                uploadContent(fileClient, inputStream, transferSize);
-            } catch (final Exception e) {
-                removeFile(fileClient);
-                throw e;
-            }
+        try (final InputStream inputStream = new BufferedInputStream(
+                fileResourceFound.map(FileResource::getInputStream)
+                        .orElseGet(() -> session.read(flowFile)))
+        ) {
+            uploadContent(fileClient, inputStream, transferSize);
+        } catch (final Exception e) {
+            removeFile(fileClient);
+            throw e;
         }
     }
 
-    //Visible for testing
-    static void uploadContent(final DataLakeFileClient fileClient, final 
InputStream in, final long length) throws IOException  {
+    private static void uploadContent(final DataLakeFileClient fileClient, 
final InputStream in, final long length) throws IOException  {
         long chunkStart = 0;
         long chunkSize;
 
@@ -258,8 +257,7 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
             chunkStart += chunkSize;
         }
 
-        // use overwrite mode due to 
https://github.com/Azure/azure-sdk-for-java/issues/31248
-        fileClient.flush(length, true);
+        fileClient.flushWithResponse(length, new 
DataLakeFileFlushOptions().setClose(true), null, Context.NONE);
     }
 
     /**
@@ -272,7 +270,7 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
      * @return the file client of the uploaded file or {@code null} if the 
file already exists and conflict resolution strategy is 'ignore'
      * @throws ProcessException if the file already exists and the conflict 
resolution strategy is 'fail'; also in case of other errors
      */
-    DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, 
final String fileName, final String conflictResolution) {
+    private DataLakeFileClient createFile(DataLakeDirectoryClient 
directoryClient, final String fileName, final String conflictResolution) {
         final String destinationPath = 
createPath(directoryClient.getDirectoryPath(), fileName);
 
         try {

Reply via email to