pvillard31 commented on code in PR #10734:
URL: https://github.com/apache/nifi/pull/10734#discussion_r2675775350
##########
nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java:
##########
@@ -316,106 +316,119 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
DiskShare share = (DiskShare) smbSession.connectShare(shareName)) {
for (FlowFile flowFile : flowFiles) {
- final long processingStartTime = System.nanoTime();
+ try {
+ final long processingStartTime = System.nanoTime();
- final String destinationDirectory =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final String destinationFilename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final String destinationDirectory =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+ final String destinationFilename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
- String destinationFullPath;
+ String destinationFullPath;
- // build destination path for the flowfile
- if (destinationDirectory == null ||
destinationDirectory.isBlank()) {
- destinationFullPath = destinationFilename;
- } else {
- destinationFullPath = new
java.io.File(destinationDirectory, destinationFilename).getPath();
- }
-
- // handle missing directory
- final String destinationFileParentDirectory = new
java.io.File(destinationFullPath).getParent();
- final Boolean createMissingDirectories =
context.getProperty(CREATE_DIRS).asBoolean();
- if (!createMissingDirectories &&
!share.folderExists(destinationFileParentDirectory)) {
- flowFile = session.penalize(flowFile);
- logger.warn("Penalizing {} and routing to failure as
configured because the destination directory ({}) doesn't exist", flowFile,
destinationFileParentDirectory);
- session.transfer(flowFile, REL_FAILURE);
- continue;
- } else if
(!share.folderExists(destinationFileParentDirectory)) {
- createMissingDirectoriesRecursevly(logger, share,
destinationFileParentDirectory);
- }
+ // build destination path for the flowfile
+ if (destinationDirectory == null ||
destinationDirectory.isBlank()) {
+ destinationFullPath = destinationFilename;
+ } else {
+ destinationFullPath = new
java.io.File(destinationDirectory, destinationFilename).getPath();
+ }
- // handle conflict resolution
- final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
- if (share.fileExists(destinationFullPath)) {
- if (conflictResolution.equals(IGNORE_RESOLUTION)) {
- session.transfer(flowFile, REL_SUCCESS);
- logger.info("Transferring {} to success as configured
because file with same name already exists", flowFile);
- continue;
- } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
+ // handle missing directory
+ final String destinationFileParentDirectory = new
java.io.File(destinationFullPath).getParent();
+ final Boolean createMissingDirectories =
context.getProperty(CREATE_DIRS).asBoolean();
+ if (!createMissingDirectories &&
!share.folderExists(destinationFileParentDirectory)) {
+ logger.warn("Penalizing {} and routing to failure as
configured because the destination directory ({}) doesn't exist", flowFile,
destinationFileParentDirectory);
flowFile = session.penalize(flowFile);
- logger.warn("Penalizing {} and routing to failure as
configured because file with the same name already exists", flowFile);
session.transfer(flowFile, REL_FAILURE);
continue;
+ } else if
(!share.folderExists(destinationFileParentDirectory)) {
+ try {
+ createMissingDirectoriesRecursively(logger, share,
destinationFileParentDirectory);
+ } catch (Exception e) {
+ logger.error("Penalizing {} and routing to failure
because failed to create missing destination directories ({})", flowFile,
destinationFileParentDirectory);
Review Comment:
Do we want to include the exception in the log message?
##########
nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java:
##########
@@ -316,106 +316,119 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
DiskShare share = (DiskShare) smbSession.connectShare(shareName)) {
for (FlowFile flowFile : flowFiles) {
- final long processingStartTime = System.nanoTime();
+ try {
+ final long processingStartTime = System.nanoTime();
- final String destinationDirectory =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final String destinationFilename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final String destinationDirectory =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+ final String destinationFilename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
- String destinationFullPath;
+ String destinationFullPath;
- // build destination path for the flowfile
- if (destinationDirectory == null ||
destinationDirectory.isBlank()) {
- destinationFullPath = destinationFilename;
- } else {
- destinationFullPath = new
java.io.File(destinationDirectory, destinationFilename).getPath();
- }
-
- // handle missing directory
- final String destinationFileParentDirectory = new
java.io.File(destinationFullPath).getParent();
- final Boolean createMissingDirectories =
context.getProperty(CREATE_DIRS).asBoolean();
- if (!createMissingDirectories &&
!share.folderExists(destinationFileParentDirectory)) {
- flowFile = session.penalize(flowFile);
- logger.warn("Penalizing {} and routing to failure as
configured because the destination directory ({}) doesn't exist", flowFile,
destinationFileParentDirectory);
- session.transfer(flowFile, REL_FAILURE);
- continue;
- } else if
(!share.folderExists(destinationFileParentDirectory)) {
- createMissingDirectoriesRecursevly(logger, share,
destinationFileParentDirectory);
- }
+ // build destination path for the flowfile
+ if (destinationDirectory == null ||
destinationDirectory.isBlank()) {
+ destinationFullPath = destinationFilename;
+ } else {
+ destinationFullPath = new
java.io.File(destinationDirectory, destinationFilename).getPath();
+ }
- // handle conflict resolution
- final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
- if (share.fileExists(destinationFullPath)) {
- if (conflictResolution.equals(IGNORE_RESOLUTION)) {
- session.transfer(flowFile, REL_SUCCESS);
- logger.info("Transferring {} to success as configured
because file with same name already exists", flowFile);
- continue;
- } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
+ // handle missing directory
+ final String destinationFileParentDirectory = new
java.io.File(destinationFullPath).getParent();
+ final Boolean createMissingDirectories =
context.getProperty(CREATE_DIRS).asBoolean();
+ if (!createMissingDirectories &&
!share.folderExists(destinationFileParentDirectory)) {
+ logger.warn("Penalizing {} and routing to failure as
configured because the destination directory ({}) doesn't exist", flowFile,
destinationFileParentDirectory);
flowFile = session.penalize(flowFile);
- logger.warn("Penalizing {} and routing to failure as
configured because file with the same name already exists", flowFile);
session.transfer(flowFile, REL_FAILURE);
continue;
+ } else if
(!share.folderExists(destinationFileParentDirectory)) {
+ try {
+ createMissingDirectoriesRecursively(logger, share,
destinationFileParentDirectory);
+ } catch (Exception e) {
+ logger.error("Penalizing {} and routing to failure
because failed to create missing destination directories ({})", flowFile,
destinationFileParentDirectory);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
}
- }
- // handle temporary suffix
- final String renameSuffixValue =
context.getProperty(RENAME_SUFFIX).getValue();
- final boolean renameSuffix = renameSuffixValue != null &&
!renameSuffixValue.isBlank();
- StringBuilder finalDestinationFullPath = new
StringBuilder(destinationFullPath);
- if (renameSuffix) {
- finalDestinationFullPath.append(renameSuffixValue);
- }
+ // handle conflict resolution
+ final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
+ if (share.fileExists(destinationFullPath)) {
+ if (conflictResolution.equals(IGNORE_RESOLUTION)) {
+ logger.info("Transferring {} to success as
configured because file with same name already exists", flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ continue;
+ } else if (conflictResolution.equals(FAIL_RESOLUTION))
{
+ logger.warn("Penalizing {} and routing to failure
as configured because file with the same name already exists", flowFile);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
+ }
- // handle the transfer
- try (
- File shareDestinationFile = share.openFile(
- finalDestinationFullPath.toString(),
- EnumSet.of(AccessMask.GENERIC_WRITE),
- EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
- sharedAccess,
- SMB2CreateDisposition.FILE_OVERWRITE_IF,
- EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
- OutputStream shareDestinationFileOutputStream =
shareDestinationFile.getOutputStream()) {
- session.exportTo(flowFile,
shareDestinationFileOutputStream);
- } catch (Exception e) {
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- logger.error("Cannot transfer the file. Penalizing {} and
routing to 'failure'", flowFile, e);
- continue;
- }
+ // handle temporary suffix
+ final String renameSuffixValue =
context.getProperty(RENAME_SUFFIX).getValue();
+ final boolean renameSuffix = renameSuffixValue != null &&
!renameSuffixValue.isBlank();
+ StringBuilder finalDestinationFullPath = new
StringBuilder(destinationFullPath);
+ if (renameSuffix) {
+ finalDestinationFullPath.append(renameSuffixValue);
+ }
- // handle the rename
- if (renameSuffix) {
- try (DiskEntry fileDiskEntry = share.open(
- finalDestinationFullPath.toString(),
- EnumSet.of(AccessMask.DELETE,
AccessMask.GENERIC_WRITE),
- EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
- sharedAccess,
- SMB2CreateDisposition.FILE_OPEN,
- EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
-
- // normalize path slashes for the network share
- destinationFullPath = destinationFullPath.replace("/",
"\\");
-
- // rename the file on the share and replace it in case
it exists
- fileDiskEntry.rename(destinationFullPath, true);
+ // handle the transfer
+ try (
+ File shareDestinationFile = share.openFile(
+ finalDestinationFullPath.toString(),
+ EnumSet.of(AccessMask.GENERIC_WRITE),
+
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+ sharedAccess,
+ SMB2CreateDisposition.FILE_OVERWRITE_IF,
+
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH));
+ OutputStream shareDestinationFileOutputStream =
shareDestinationFile.getOutputStream()) {
+ session.exportTo(flowFile,
shareDestinationFileOutputStream);
} catch (Exception e) {
+ logger.error("Cannot transfer the file. Penalizing {}
and routing to 'failure'", flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
- logger.error("Cannot rename the file. Penalizing {}
and routing to 'failure'", flowFile, e);
continue;
}
- }
- // handle the success
- final URI provenanceUri = new URI("smb", hostname, "/" +
destinationFullPath.replace('\\', '/'), null);
- final long processingTimeInNano = System.nanoTime() -
processingStartTime;
- final long processingTimeInMilli =
TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
- session.getProvenanceReporter().send(flowFile,
provenanceUri.toString(), processingTimeInMilli);
- session.transfer(flowFile, REL_SUCCESS);
+ // handle the rename
+ if (renameSuffix) {
+ try (DiskEntry fileDiskEntry = share.open(
+ finalDestinationFullPath.toString(),
+ EnumSet.of(AccessMask.DELETE,
AccessMask.GENERIC_WRITE),
+
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+ sharedAccess,
+ SMB2CreateDisposition.FILE_OPEN,
+
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {
+
+ // normalize path slashes for the network share
+ destinationFullPath =
destinationFullPath.replace("/", "\\");
+
+ // rename the file on the share and replace it in
case it exists
+ fileDiskEntry.rename(destinationFullPath, true);
+ } catch (Exception e) {
+ logger.error("Cannot rename the file. Penalizing
{} and routing to 'failure'", flowFile, e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
+ }
+
+ // handle the success
+ final URI provenanceUri = new URI("smb", hostname, "/" +
destinationFullPath.replace('\\', '/'), null);
+ final long processingTimeInNano = System.nanoTime() -
processingStartTime;
+ final long processingTimeInMilli =
TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS);
+ session.getProvenanceReporter().send(flowFile,
provenanceUri.toString(), processingTimeInMilli);
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ logger.error("Error processing flowfile", e);
Review Comment:
Do we want to include the flowfile?
```suggestion
logger.error("Error processing flowfile", flowFile, e);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]