This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a94d26883 NIFI-15431 Fixed error handling in PutSmbFile to allow 
batch processing to continue
3a94d26883 is described below

commit 3a94d2688365a19695b1f0e7eeedae291c8801ed
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Jan 6 18:48:40 2026 +0100

    NIFI-15431 Fixed error handling in PutSmbFile to allow batch processing to 
continue
---
 .../org/apache/nifi/processors/smb/PutSmbFile.java | 175 +++++++++++----------
 .../apache/nifi/processors/smb/PutSmbFileTest.java |  25 +++
 2 files changed, 119 insertions(+), 81 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 2bab134bf2..4ca2d50a5d 100644
--- 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -261,7 +261,7 @@ public class PutSmbFile extends AbstractProcessor {
         return buildSmbClient(context);
     }
 
-    private void createMissingDirectoriesRecursevly(ComponentLog logger, 
DiskShare share, String pathToCreate) {
+    private void createMissingDirectoriesRecursively(ComponentLog logger, 
DiskShare share, String pathToCreate) {
         List<String> paths = new ArrayList<>();
 
         java.io.File file = new java.io.File(pathToCreate);
@@ -316,106 +316,119 @@ public class PutSmbFile extends AbstractProcessor {
             DiskShare share = (DiskShare) smbSession.connectShare(shareName)) {
 
             for (FlowFile flowFile : flowFiles) {
-                final long processingStartTime = System.nanoTime();
+                try {
+                    final long processingStartTime = System.nanoTime();
 
-                final String destinationDirectory = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
-                final String destinationFilename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+                    final String destinationDirectory = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+                    final String destinationFilename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
 
-                String destinationFullPath;
+                    String destinationFullPath;
 
-                // build destination path for the flowfile
-                if (destinationDirectory == null || 
destinationDirectory.isBlank()) {
-                    destinationFullPath = destinationFilename;
-                } else {
-                    destinationFullPath = new 
java.io.File(destinationDirectory, destinationFilename).getPath();
-                }
-
-                // handle missing directory
-                final String destinationFileParentDirectory = new 
java.io.File(destinationFullPath).getParent();
-                final Boolean createMissingDirectories = 
context.getProperty(CREATE_DIRS).asBoolean();
-                if (!createMissingDirectories && 
!share.folderExists(destinationFileParentDirectory)) {
-                    flowFile = session.penalize(flowFile);
-                    logger.warn("Penalizing {} and routing to failure as 
configured because the destination directory ({}) doesn't exist", flowFile, 
destinationFileParentDirectory);
-                    session.transfer(flowFile, REL_FAILURE);
-                    continue;
-                } else if 
(!share.folderExists(destinationFileParentDirectory)) {
-                    createMissingDirectoriesRecursevly(logger, share, 
destinationFileParentDirectory);
-                }
+                    // build destination path for the flowfile
+                    if (destinationDirectory == null || 
destinationDirectory.isBlank()) {
+                        destinationFullPath = destinationFilename;
+                    } else {
+                        destinationFullPath = new 
java.io.File(destinationDirectory, destinationFilename).getPath();
+                    }
 
-                // handle conflict resolution
-                final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
-                if (share.fileExists(destinationFullPath)) {
-                    if (conflictResolution.equals(IGNORE_RESOLUTION)) {
-                        session.transfer(flowFile, REL_SUCCESS);
-                        logger.info("Transferring {} to success as configured 
because file with same name already exists", flowFile);
-                        continue;
-                    } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
+                    // handle missing directory
+                    final String destinationFileParentDirectory = new 
java.io.File(destinationFullPath).getParent();
+                    final Boolean createMissingDirectories = 
context.getProperty(CREATE_DIRS).asBoolean();
+                    if (!createMissingDirectories && 
!share.folderExists(destinationFileParentDirectory)) {
+                        logger.warn("Penalizing {} and routing to failure as 
configured because the destination directory ({}) doesn't exist", flowFile, 
destinationFileParentDirectory);
                         flowFile = session.penalize(flowFile);
-                        logger.warn("Penalizing {} and routing to failure as 
configured because file with the same name already exists", flowFile);
                         session.transfer(flowFile, REL_FAILURE);
                         continue;
+                    } else if 
(!share.folderExists(destinationFileParentDirectory)) {
+                        try {
+                            createMissingDirectoriesRecursively(logger, share, 
destinationFileParentDirectory);
+                        } catch (Exception e) {
+                            logger.error("Penalizing {} and routing to failure 
because failed to create missing destination directories ({})", flowFile, 
destinationFileParentDirectory, e);
+                            flowFile = session.penalize(flowFile);
+                            session.transfer(flowFile, REL_FAILURE);
+                            continue;
+                        }
                     }
-                }
 
-                // handle temporary suffix
-                final String renameSuffixValue = 
context.getProperty(RENAME_SUFFIX).getValue();
-                final boolean renameSuffix = renameSuffixValue != null && 
!renameSuffixValue.isBlank();
-                StringBuilder finalDestinationFullPath = new 
StringBuilder(destinationFullPath);
-                if (renameSuffix) {
-                    finalDestinationFullPath.append(renameSuffixValue);
-                }
+                    // handle conflict resolution
+                    final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+                    if (share.fileExists(destinationFullPath)) {
+                        if (conflictResolution.equals(IGNORE_RESOLUTION)) {
+                            logger.info("Transferring {} to success as 
configured because file with same name already exists", flowFile);
+                            session.transfer(flowFile, REL_SUCCESS);
+                            continue;
+                        } else if (conflictResolution.equals(FAIL_RESOLUTION)) 
{
+                            logger.warn("Penalizing {} and routing to failure 
as configured because file with the same name already exists", flowFile);
+                            flowFile = session.penalize(flowFile);
+                            session.transfer(flowFile, REL_FAILURE);
+                            continue;
+                        }
+                    }
 
-                // handle the transfer
-                try (
-                    File shareDestinationFile = share.openFile(
-                        finalDestinationFullPath.toString(),
-                        EnumSet.of(AccessMask.GENERIC_WRITE),
-                        EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
-                        sharedAccess,
-                        SMB2CreateDisposition.FILE_OVERWRITE_IF,
-                        EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
-                    OutputStream shareDestinationFileOutputStream = 
shareDestinationFile.getOutputStream()) {
-                    session.exportTo(flowFile, 
shareDestinationFileOutputStream);
-                } catch (Exception e) {
-                    flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, REL_FAILURE);
-                    logger.error("Cannot transfer the file. Penalizing {} and 
routing to 'failure'", flowFile, e);
-                    continue;
-                }
+                    // handle temporary suffix
+                    final String renameSuffixValue = 
context.getProperty(RENAME_SUFFIX).getValue();
+                    final boolean renameSuffix = renameSuffixValue != null && 
!renameSuffixValue.isBlank();
+                    StringBuilder finalDestinationFullPath = new 
StringBuilder(destinationFullPath);
+                    if (renameSuffix) {
+                        finalDestinationFullPath.append(renameSuffixValue);
+                    }
 
-                // handle the rename
-                if (renameSuffix) {
-                    try (DiskEntry fileDiskEntry = share.open(
-                        finalDestinationFullPath.toString(),
-                        EnumSet.of(AccessMask.DELETE, 
AccessMask.GENERIC_WRITE),
-                        EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
-                        sharedAccess,
-                        SMB2CreateDisposition.FILE_OPEN,
-                        EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
-
-                        // normalize path slashes for the network share
-                        destinationFullPath = destinationFullPath.replace("/", 
"\\");
-
-                        // rename the file on the share and replace it in case 
it exists
-                        fileDiskEntry.rename(destinationFullPath, true);
+                    // handle the transfer
+                    try (
+                            File shareDestinationFile = share.openFile(
+                                    finalDestinationFullPath.toString(),
+                                    EnumSet.of(AccessMask.GENERIC_WRITE),
+                                    
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+                                    sharedAccess,
+                                    SMB2CreateDisposition.FILE_OVERWRITE_IF,
+                                    
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
+                            OutputStream shareDestinationFileOutputStream = 
shareDestinationFile.getOutputStream()) {
+                        session.exportTo(flowFile, 
shareDestinationFileOutputStream);
                     } catch (Exception e) {
+                        logger.error("Cannot transfer the file. Penalizing {} 
and routing to 'failure'", flowFile, e);
                         flowFile = session.penalize(flowFile);
                         session.transfer(flowFile, REL_FAILURE);
-                        logger.error("Cannot rename the file. Penalizing {} 
and routing to 'failure'", flowFile, e);
                         continue;
                     }
-                }
 
-                // handle the success
-                final URI provenanceUri = new URI("smb", hostname, "/" + 
destinationFullPath.replace('\\', '/'), null);
-                final long processingTimeInNano = System.nanoTime() - 
processingStartTime;
-                final long processingTimeInMilli = 
TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
-                session.getProvenanceReporter().send(flowFile, 
provenanceUri.toString(), processingTimeInMilli);
-                session.transfer(flowFile, REL_SUCCESS);
+                    // handle the rename
+                    if (renameSuffix) {
+                        try (DiskEntry fileDiskEntry = share.open(
+                                finalDestinationFullPath.toString(),
+                                EnumSet.of(AccessMask.DELETE, 
AccessMask.GENERIC_WRITE),
+                                
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+                                sharedAccess,
+                                SMB2CreateDisposition.FILE_OPEN,
+                                
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
+
+                            // normalize path slashes for the network share
+                            destinationFullPath = 
destinationFullPath.replace("/", "\\");
+
+                            // rename the file on the share and replace it in 
case it exists
+                            fileDiskEntry.rename(destinationFullPath, true);
+                        } catch (Exception e) {
+                            logger.error("Cannot rename the file. Penalizing 
{} and routing to 'failure'", flowFile, e);
+                            flowFile = session.penalize(flowFile);
+                            session.transfer(flowFile, REL_FAILURE);
+                            continue;
+                        }
+                    }
+
+                    // handle the success
+                    final URI provenanceUri = new URI("smb", hostname, "/" + 
destinationFullPath.replace('\\', '/'), null);
+                    final long processingTimeInNano = System.nanoTime() - 
processingStartTime;
+                    final long processingTimeInMilli = 
TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
+                    session.getProvenanceReporter().send(flowFile, 
provenanceUri.toString(), processingTimeInMilli);
+                    session.transfer(flowFile, REL_SUCCESS);
+                } catch (Exception e) {
+                    logger.error("Error processing flowfile {}", flowFile, e);
+                    flowFile = session.penalize(flowFile);
+                    session.transfer(flowFile, REL_FAILURE);
+                }
             }
         } catch (Exception e) {
-            session.transfer(flowFiles, REL_FAILURE);
             logger.error("Could not establish smb connection", e);
+            session.transfer(flowFiles, REL_FAILURE);
             smbClient.getServerList().unregister(hostname);
         }
     }
diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
index cecb61d104..d6625614e8 100644
--- 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
@@ -52,6 +52,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -293,6 +294,30 @@ public class PutSmbFileTest {
         );
     }
 
+    @Test
+    public void testBatchCanContinueAfterDirectoryCreationFailure() throws 
IOException {
+        when(diskShare.folderExists(any())).thenReturn(false);
+        doThrow(new RuntimeException("Access 
denied")).when(diskShare).mkdir("dir2");
+
+        FlowFile flowFile1 = createFlowFileWithDirectoryAttribute(1, "dir1");
+        FlowFile flowFile2 = createFlowFileWithDirectoryAttribute(2, "dir2");
+        FlowFile flowFile3 = createFlowFileWithDirectoryAttribute(3, "dir3");
+
+        testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
+        testRunner.setProperty(PutSmbFile.DIRECTORY, "${directory}");
+        testRunner.enqueue(flowFile1, flowFile2, flowFile3);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutSmbFile.REL_SUCCESS, 2);
+        testRunner.assertTransferCount(PutSmbFile.REL_FAILURE, 1);
+    }
+
+    private FlowFile createFlowFileWithDirectoryAttribute(long id, String 
directory) {
+        MockFlowFile flowFile = new MockFlowFile(id);
+        flowFile.putAttributes(Map.of("directory", directory));
+        return flowFile;
+    }
+
     @Test
     public void testFileShareNone() throws IOException {
         testRunner.setProperty(PutSmbFile.SHARE_ACCESS, 
PutSmbFile.SHARE_ACCESS_NONE);

Reply via email to