This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3a94d26883 NIFI-15431 Fixed error handling in PutSmbFile to allow
batch processing to continue
3a94d26883 is described below
commit 3a94d2688365a19695b1f0e7eeedae291c8801ed
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Jan 6 18:48:40 2026 +0100
NIFI-15431 Fixed error handling in PutSmbFile to allow batch processing to
continue
---
.../org/apache/nifi/processors/smb/PutSmbFile.java | 175 +++++++++++----------
.../apache/nifi/processors/smb/PutSmbFileTest.java | 25 +++
2 files changed, 119 insertions(+), 81 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 2bab134bf2..4ca2d50a5d 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -261,7 +261,7 @@ public class PutSmbFile extends AbstractProcessor {
return buildSmbClient(context);
}
- private void createMissingDirectoriesRecursevly(ComponentLog logger,
DiskShare share, String pathToCreate) {
+ private void createMissingDirectoriesRecursively(ComponentLog logger,
DiskShare share, String pathToCreate) {
List<String> paths = new ArrayList<>();
java.io.File file = new java.io.File(pathToCreate);
@@ -316,106 +316,119 @@ public class PutSmbFile extends AbstractProcessor {
DiskShare share = (DiskShare) smbSession.connectShare(shareName)) {
for (FlowFile flowFile : flowFiles) {
- final long processingStartTime = System.nanoTime();
+ try {
+ final long processingStartTime = System.nanoTime();
- final String destinationDirectory =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final String destinationFilename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final String destinationDirectory =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+ final String destinationFilename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
- String destinationFullPath;
+ String destinationFullPath;
- // build destination path for the flowfile
- if (destinationDirectory == null ||
destinationDirectory.isBlank()) {
- destinationFullPath = destinationFilename;
- } else {
- destinationFullPath = new
java.io.File(destinationDirectory, destinationFilename).getPath();
- }
-
- // handle missing directory
- final String destinationFileParentDirectory = new
java.io.File(destinationFullPath).getParent();
- final Boolean createMissingDirectories =
context.getProperty(CREATE_DIRS).asBoolean();
- if (!createMissingDirectories &&
!share.folderExists(destinationFileParentDirectory)) {
- flowFile = session.penalize(flowFile);
- logger.warn("Penalizing {} and routing to failure as
configured because the destination directory ({}) doesn't exist", flowFile,
destinationFileParentDirectory);
- session.transfer(flowFile, REL_FAILURE);
- continue;
- } else if
(!share.folderExists(destinationFileParentDirectory)) {
- createMissingDirectoriesRecursevly(logger, share,
destinationFileParentDirectory);
- }
+ // build destination path for the flowfile
+ if (destinationDirectory == null ||
destinationDirectory.isBlank()) {
+ destinationFullPath = destinationFilename;
+ } else {
+ destinationFullPath = new
java.io.File(destinationDirectory, destinationFilename).getPath();
+ }
- // handle conflict resolution
- final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
- if (share.fileExists(destinationFullPath)) {
- if (conflictResolution.equals(IGNORE_RESOLUTION)) {
- session.transfer(flowFile, REL_SUCCESS);
- logger.info("Transferring {} to success as configured
because file with same name already exists", flowFile);
- continue;
- } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
+ // handle missing directory
+ final String destinationFileParentDirectory = new
java.io.File(destinationFullPath).getParent();
+ final Boolean createMissingDirectories =
context.getProperty(CREATE_DIRS).asBoolean();
+ if (!createMissingDirectories &&
!share.folderExists(destinationFileParentDirectory)) {
+ logger.warn("Penalizing {} and routing to failure as
configured because the destination directory ({}) doesn't exist", flowFile,
destinationFileParentDirectory);
flowFile = session.penalize(flowFile);
- logger.warn("Penalizing {} and routing to failure as
configured because file with the same name already exists", flowFile);
session.transfer(flowFile, REL_FAILURE);
continue;
+ } else if
(!share.folderExists(destinationFileParentDirectory)) {
+ try {
+ createMissingDirectoriesRecursively(logger, share,
destinationFileParentDirectory);
+ } catch (Exception e) {
+ logger.error("Penalizing {} and routing to failure
because failed to create missing destination directories ({})", flowFile,
destinationFileParentDirectory, e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
}
- }
- // handle temporary suffix
- final String renameSuffixValue =
context.getProperty(RENAME_SUFFIX).getValue();
- final boolean renameSuffix = renameSuffixValue != null &&
!renameSuffixValue.isBlank();
- StringBuilder finalDestinationFullPath = new
StringBuilder(destinationFullPath);
- if (renameSuffix) {
- finalDestinationFullPath.append(renameSuffixValue);
- }
+ // handle conflict resolution
+ final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
+ if (share.fileExists(destinationFullPath)) {
+ if (conflictResolution.equals(IGNORE_RESOLUTION)) {
+ logger.info("Transferring {} to success as
configured because file with same name already exists", flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ continue;
+ } else if (conflictResolution.equals(FAIL_RESOLUTION))
{
+ logger.warn("Penalizing {} and routing to failure
as configured because file with the same name already exists", flowFile);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
+ }
- // handle the transfer
- try (
- File shareDestinationFile = share.openFile(
- finalDestinationFullPath.toString(),
- EnumSet.of(AccessMask.GENERIC_WRITE),
- EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
- sharedAccess,
- SMB2CreateDisposition.FILE_OVERWRITE_IF,
- EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
- OutputStream shareDestinationFileOutputStream =
shareDestinationFile.getOutputStream()) {
- session.exportTo(flowFile,
shareDestinationFileOutputStream);
- } catch (Exception e) {
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- logger.error("Cannot transfer the file. Penalizing {} and
routing to 'failure'", flowFile, e);
- continue;
- }
+ // handle temporary suffix
+ final String renameSuffixValue =
context.getProperty(RENAME_SUFFIX).getValue();
+ final boolean renameSuffix = renameSuffixValue != null &&
!renameSuffixValue.isBlank();
+ StringBuilder finalDestinationFullPath = new
StringBuilder(destinationFullPath);
+ if (renameSuffix) {
+ finalDestinationFullPath.append(renameSuffixValue);
+ }
- // handle the rename
- if (renameSuffix) {
- try (DiskEntry fileDiskEntry = share.open(
- finalDestinationFullPath.toString(),
- EnumSet.of(AccessMask.DELETE,
AccessMask.GENERIC_WRITE),
- EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
- sharedAccess,
- SMB2CreateDisposition.FILE_OPEN,
- EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
-
- // normalize path slashes for the network share
- destinationFullPath = destinationFullPath.replace("/",
"\\");
-
- // rename the file on the share and replace it in case
it exists
- fileDiskEntry.rename(destinationFullPath, true);
+ // handle the transfer
+ try (
+ File shareDestinationFile = share.openFile(
+ finalDestinationFullPath.toString(),
+ EnumSet.of(AccessMask.GENERIC_WRITE),
+
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+ sharedAccess,
+ SMB2CreateDisposition.FILE_OVERWRITE_IF,
+
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
+ OutputStream shareDestinationFileOutputStream =
shareDestinationFile.getOutputStream()) {
+ session.exportTo(flowFile,
shareDestinationFileOutputStream);
} catch (Exception e) {
+ logger.error("Cannot transfer the file. Penalizing {}
and routing to 'failure'", flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
- logger.error("Cannot rename the file. Penalizing {}
and routing to 'failure'", flowFile, e);
continue;
}
- }
- // handle the success
- final URI provenanceUri = new URI("smb", hostname, "/" +
destinationFullPath.replace('\\', '/'), null);
- final long processingTimeInNano = System.nanoTime() -
processingStartTime;
- final long processingTimeInMilli =
TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
- session.getProvenanceReporter().send(flowFile,
provenanceUri.toString(), processingTimeInMilli);
- session.transfer(flowFile, REL_SUCCESS);
+ // handle the rename
+ if (renameSuffix) {
+ try (DiskEntry fileDiskEntry = share.open(
+ finalDestinationFullPath.toString(),
+ EnumSet.of(AccessMask.DELETE,
AccessMask.GENERIC_WRITE),
+
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+ sharedAccess,
+ SMB2CreateDisposition.FILE_OPEN,
+
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
+
+ // normalize path slashes for the network share
+ destinationFullPath =
destinationFullPath.replace("/", "\\");
+
+ // rename the file on the share and replace it in
case it exists
+ fileDiskEntry.rename(destinationFullPath, true);
+ } catch (Exception e) {
+ logger.error("Cannot rename the file. Penalizing
{} and routing to 'failure'", flowFile, e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
+ }
+
+ // handle the success
+ final URI provenanceUri = new URI("smb", hostname, "/" +
destinationFullPath.replace('\\', '/'), null);
+ final long processingTimeInNano = System.nanoTime() -
processingStartTime;
+ final long processingTimeInMilli =
TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
+ session.getProvenanceReporter().send(flowFile,
provenanceUri.toString(), processingTimeInMilli);
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ logger.error("Error processing flowfile {}", flowFile, e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
}
} catch (Exception e) {
- session.transfer(flowFiles, REL_FAILURE);
logger.error("Could not establish smb connection", e);
+ session.transfer(flowFiles, REL_FAILURE);
smbClient.getServerList().unregister(hostname);
}
}
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
index cecb61d104..d6625614e8 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
@@ -52,6 +52,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -293,6 +294,30 @@ public class PutSmbFileTest {
);
}
+ @Test
+ public void testBatchCanContinueAfterDirectoryCreationFailure() throws
IOException {
+ when(diskShare.folderExists(any())).thenReturn(false);
+ doThrow(new RuntimeException("Access
denied")).when(diskShare).mkdir("dir2");
+
+ FlowFile flowFile1 = createFlowFileWithDirectoryAttribute(1, "dir1");
+ FlowFile flowFile2 = createFlowFileWithDirectoryAttribute(2, "dir2");
+ FlowFile flowFile3 = createFlowFileWithDirectoryAttribute(3, "dir3");
+
+ testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
+ testRunner.setProperty(PutSmbFile.DIRECTORY, "${directory}");
+ testRunner.enqueue(flowFile1, flowFile2, flowFile3);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutSmbFile.REL_SUCCESS, 2);
+ testRunner.assertTransferCount(PutSmbFile.REL_FAILURE, 1);
+ }
+
+ private FlowFile createFlowFileWithDirectoryAttribute(long id, String
directory) {
+ MockFlowFile flowFile = new MockFlowFile(id);
+ flowFile.putAttributes(Map.of("directory", directory));
+ return flowFile;
+ }
+
@Test
public void testFileShareNone() throws IOException {
testRunner.setProperty(PutSmbFile.SHARE_ACCESS,
PutSmbFile.SHARE_ACCESS_NONE);