gresockj commented on a change in pull request #5042:
URL: https://github.com/apache/nifi/pull/5042#discussion_r625882748
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
##########
@@ -306,37 +306,8 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
// it is critical that we commit the session before
moving/deleting the remote file. Otherwise, we could have a situation where
// we ingest the data, delete/move the remote file, and then NiFi
dies/is shut down before the session is committed. This would
// result in data loss! If we commit the session first, we are
safe.
- session.commit();
-
- final String completionStrategy =
context.getProperty(COMPLETION_STRATEGY).getValue();
- if
(COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
- try {
- transfer.deleteFile(flowFile, null, filename);
- } catch (final FileNotFoundException e) {
- // file doesn't exist -- effectively the same as removing
it. Move on.
- } catch (final IOException ioe) {
- getLogger().warn("Successfully fetched the content for {}
from {}:{}{} but failed to remove the remote file due to {}",
- new Object[]{flowFile, host, port, filename, ioe},
ioe);
- }
- } else if
(COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
- final String targetDir =
context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
- final String simpleFilename =
StringUtils.substringAfterLast(filename, "/");
-
- try {
- final String absoluteTargetDirPath =
transfer.getAbsolutePath(flowFile, targetDir);
- final File targetFile = new File(absoluteTargetDirPath,
simpleFilename);
- if
(context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
- // Create the target directory if necessary.
- transfer.ensureDirectoryExists(flowFile,
targetFile.getParentFile());
- }
-
- transfer.rename(flowFile, filename,
targetFile.getAbsolutePath());
-
- } catch (final IOException ioe) {
- getLogger().warn("Successfully fetched the content for {}
from {}:{}{} but failed to rename the remote file due to {}",
- new Object[]{flowFile, host, port, filename, ioe},
ioe);
- }
- }
+ final FlowFile flowFileReceived = flowFile;
+ session.commitAsync(() -> performCompletionStrategy(transfer,
context, flowFileReceived, filename, host, port));
Review comment:
FWIW though, runtime testing doesn't expose any issues, even at a high
thread count and throughput.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]