gresockj commented on a change in pull request #5042:
URL: https://github.com/apache/nifi/pull/5042#discussion_r625882748



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
##########
@@ -306,37 +306,8 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             // it is critical that we commit the session before 
moving/deleting the remote file. Otherwise, we could have a situation where
             // we ingest the data, delete/move the remote file, and then NiFi 
dies/is shut down before the session is committed. This would
             // result in data loss! If we commit the session first, we are 
safe.
-            session.commit();
-
-            final String completionStrategy = 
context.getProperty(COMPLETION_STRATEGY).getValue();
-            if 
(COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
-                try {
-                    transfer.deleteFile(flowFile, null, filename);
-                } catch (final FileNotFoundException e) {
-                    // file doesn't exist -- effectively the same as removing 
it. Move on.
-                } catch (final IOException ioe) {
-                    getLogger().warn("Successfully fetched the content for {} 
from {}:{}{} but failed to remove the remote file due to {}",
-                            new Object[]{flowFile, host, port, filename, ioe}, 
ioe);
-                }
-            } else if 
(COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
-                final String targetDir = 
context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
-                final String simpleFilename = 
StringUtils.substringAfterLast(filename, "/");
-
-                try {
-                    final String absoluteTargetDirPath = 
transfer.getAbsolutePath(flowFile, targetDir);
-                    final File targetFile = new File(absoluteTargetDirPath, 
simpleFilename);
-                    if 
(context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
-                        // Create the target directory if necessary.
-                        transfer.ensureDirectoryExists(flowFile, 
targetFile.getParentFile());
-                    }
-
-                    transfer.rename(flowFile, filename, 
targetFile.getAbsolutePath());
-
-                } catch (final IOException ioe) {
-                    getLogger().warn("Successfully fetched the content for {} 
from {}:{}{} but failed to rename the remote file due to {}",
-                            new Object[]{flowFile, host, port, filename, ioe}, 
ioe);
-                }
-            }
+            final FlowFile flowFileReceived = flowFile;
+            session.commitAsync(() -> performCompletionStrategy(transfer, 
context, flowFileReceived, filename, host, port));

Review comment:
       FWIW though, runtime testing doesn't expose any issues, even at a high 
thread count and throughput.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to