Repository: nifi
Updated Branches:
  refs/heads/master 13e42678b -> 883c223ce


NIFI-4386 Fixing connection handling in FetchFileTransfer

Signed-off-by: Pierre Villard <[email protected]>

This closes #2203.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/883c223c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/883c223c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/883c223c

Branch: refs/heads/master
Commit: 883c223ced84de2b9440ed3c5e3686533c03365f
Parents: 13e4267
Author: Bryan Bende <[email protected]>
Authored: Mon Oct 9 15:25:27 2017 -0400
Committer: Pierre Villard <[email protected]>
Committed: Tue Oct 10 08:50:57 2017 +0200

----------------------------------------------------------------------
 .../processors/standard/FetchFileTransfer.java  | 207 ++++++++++---------
 1 file changed, 110 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/883c223c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 0023c4b..b92ed98 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -17,20 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
@@ -50,6 +36,20 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.Tuple;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 /**
  * A base class for FetchSFTP, FetchFTP processors.
  *
@@ -230,97 +230,110 @@ public abstract class FetchFileTransfer extends 
AbstractProcessor {
             transfer = transferWrapper.getFileTransfer();
         }
 
-        // Pull data from remote system.
-        final InputStream in;
+        boolean closeConnection = false;
         try {
-            in = transfer.getInputStream(filename, flowFile);
-
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    StreamUtils.copy(in, out);
-                }
-            });
-
-            if(!transfer.flush(flowFile)) {
-                throw new IOException("completePendingCommand returned false, 
file transfer failed");
-            }
-
-            transferQueue.offer(new FileTransferIdleWrapper(transfer, 
System.nanoTime()));
-        } catch (final FileNotFoundException e) {
-            getLogger().error("Failed to fetch content for {} from filename {} 
on remote host {} because the file could not be found on the remote system; 
routing to {}",
-                new Object[] {flowFile, filename, host, 
REL_NOT_FOUND.getName()});
-            session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
-            session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
-            return;
-        } catch (final PermissionDeniedException e) {
-            getLogger().error("Failed to fetch content for {} from filename {} 
on remote host {} due to insufficient permissions; routing to {}",
-                new Object[] {flowFile, filename, host, 
REL_PERMISSION_DENIED.getName()});
-            session.transfer(session.penalize(flowFile), 
REL_PERMISSION_DENIED);
-            session.getProvenanceReporter().route(flowFile, 
REL_PERMISSION_DENIED);
-            return;
-        } catch (final ProcessException | IOException e) {
+            // Pull data from remote system.
+            final InputStream in;
             try {
-                transfer.close();
-            } catch (final IOException e1) {
-                getLogger().warn("Failed to close connection to {}:{} due to 
{}", new Object[] {host, port, e.toString()}, e);
-            }
-
-            getLogger().error("Failed to fetch content for {} from filename {} 
on remote host {}:{} due to {}; routing to comms.failure",
-                new Object[] {flowFile, filename, host, port, e.toString()}, 
e);
-            session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
-            return;
-        }
-
-        // Add FlowFile attributes
-        final String protocolName = transfer.getProtocolName();
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(protocolName + ".remote.host", host);
-        attributes.put(protocolName + ".remote.port", String.valueOf(port));
-        attributes.put(protocolName + ".remote.filename", filename);
-
-        if (filename.contains("/")) {
-            final String path = StringUtils.substringBeforeLast(filename, "/");
-            final String filenameOnly = 
StringUtils.substringAfterLast(filename, "/");
-            attributes.put(CoreAttributes.PATH.key(), path);
-            attributes.put(CoreAttributes.FILENAME.key(), filenameOnly);
-        } else {
-            attributes.put(CoreAttributes.FILENAME.key(), filename);
-        }
-        flowFile = session.putAllAttributes(flowFile, attributes);
+                in = transfer.getInputStream(filename, flowFile);
 
-        // emit provenance event and transfer FlowFile
-        session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + 
host + ":" + port + "/" + filename, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-        session.transfer(flowFile, REL_SUCCESS);
+                flowFile = session.write(flowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        StreamUtils.copy(in, out);
+                    }
+                });
 
-        // it is critical that we commit the session before moving/deleting 
the remote file. Otherwise, we could have a situation where
-        // we ingest the data, delete/move the remote file, and then NiFi 
dies/is shut down before the session is committed. This would
-        // result in data loss! If we commit the session first, we are safe.
-        session.commit();
+                if (!transfer.flush(flowFile)) {
+                    throw new IOException("completePendingCommand returned 
false, file transfer failed");
+                }
 
-        final String completionStrategy = 
context.getProperty(COMPLETION_STRATEGY).getValue();
-        if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) 
{
-            try {
-                transfer.deleteFile(flowFile, null, filename);
             } catch (final FileNotFoundException e) {
-                // file doesn't exist -- effectively the same as removing it. 
Move on.
-            } catch (final IOException ioe) {
-                getLogger().warn("Successfully fetched the content for {} from 
{}:{}{} but failed to remove the remote file due to {}",
-                    new Object[] {flowFile, host, port, filename, ioe}, ioe);
-            }
-        } else if 
(COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
-            String targetDir = 
context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
-            if (!targetDir.endsWith("/")) {
-                targetDir = targetDir + "/";
+                closeConnection = false;
+                getLogger().error("Failed to fetch content for {} from 
filename {} on remote host {} because the file could not be found on the remote 
system; routing to {}",
+                        new Object[]{flowFile, filename, host, 
REL_NOT_FOUND.getName()});
+                session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
+                session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
+                return;
+            } catch (final PermissionDeniedException e) {
+                closeConnection = false;
+                getLogger().error("Failed to fetch content for {} from 
filename {} on remote host {} due to insufficient permissions; routing to {}",
+                        new Object[]{flowFile, filename, host, 
REL_PERMISSION_DENIED.getName()});
+                session.transfer(session.penalize(flowFile), 
REL_PERMISSION_DENIED);
+                session.getProvenanceReporter().route(flowFile, 
REL_PERMISSION_DENIED);
+                return;
+            } catch (final ProcessException | IOException e) {
+                closeConnection = true;
+                getLogger().error("Failed to fetch content for {} from 
filename {} on remote host {}:{} due to {}; routing to comms.failure",
+                        new Object[]{flowFile, filename, host, port, 
e.toString()}, e);
+                session.transfer(session.penalize(flowFile), 
REL_COMMS_FAILURE);
+                return;
             }
-            final String simpleFilename = 
StringUtils.substringAfterLast(filename, "/");
-            final String target = targetDir + simpleFilename;
 
-            try {
-                transfer.rename(flowFile, filename, target);
-            } catch (final IOException ioe) {
-                getLogger().warn("Successfully fetched the content for {} from 
{}:{}{} but failed to rename the remote file due to {}",
-                    new Object[] {flowFile, host, port, filename, ioe}, ioe);
+            // Add FlowFile attributes
+            final String protocolName = transfer.getProtocolName();
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(protocolName + ".remote.host", host);
+            attributes.put(protocolName + ".remote.port", 
String.valueOf(port));
+            attributes.put(protocolName + ".remote.filename", filename);
+
+            if (filename.contains("/")) {
+                final String path = StringUtils.substringBeforeLast(filename, 
"/");
+                final String filenameOnly = 
StringUtils.substringAfterLast(filename, "/");
+                attributes.put(CoreAttributes.PATH.key(), path);
+                attributes.put(CoreAttributes.FILENAME.key(), filenameOnly);
+            } else {
+                attributes.put(CoreAttributes.FILENAME.key(), filename);
+            }
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            // emit provenance event and transfer FlowFile
+            session.getProvenanceReporter().fetch(flowFile, protocolName + 
"://" + host + ":" + port + "/" + filename, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, REL_SUCCESS);
+
+            // it is critical that we commit the session before 
moving/deleting the remote file. Otherwise, we could have a situation where
+            // we ingest the data, delete/move the remote file, and then NiFi 
dies/is shut down before the session is committed. This would
+            // result in data loss! If we commit the session first, we are 
safe.
+            session.commit();
+
+            final String completionStrategy = 
context.getProperty(COMPLETION_STRATEGY).getValue();
+            if 
(COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
+                try {
+                    transfer.deleteFile(flowFile, null, filename);
+                } catch (final FileNotFoundException e) {
+                    // file doesn't exist -- effectively the same as removing 
it. Move on.
+                } catch (final IOException ioe) {
+                    getLogger().warn("Successfully fetched the content for {} 
from {}:{}{} but failed to remove the remote file due to {}",
+                            new Object[]{flowFile, host, port, filename, ioe}, 
ioe);
+                }
+            } else if 
(COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
+                String targetDir = 
context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
+                if (!targetDir.endsWith("/")) {
+                    targetDir = targetDir + "/";
+                }
+                final String simpleFilename = 
StringUtils.substringAfterLast(filename, "/");
+                final String target = targetDir + simpleFilename;
+
+                try {
+                    transfer.rename(flowFile, filename, target);
+                } catch (final IOException ioe) {
+                    getLogger().warn("Successfully fetched the content for {} 
from {}:{}{} but failed to rename the remote file due to {}",
+                            new Object[]{flowFile, host, port, filename, ioe}, 
ioe);
+                }
+            }
+        } finally {
+            if (transfer != null) {
+                if (closeConnection) {
+                    getLogger().debug("Closing FileTransfer...");
+                    try {
+                        transfer.close();
+                    } catch (final IOException e) {
+                        getLogger().warn("Failed to close connection to {}:{} 
due to {}", new Object[]{host, port, e.getMessage()}, e);
+                    }
+                } else {
+                    getLogger().debug("Returning FileTransfer to pool...");
+                    transferQueue.offer(new FileTransferIdleWrapper(transfer, 
System.nanoTime()));
+                }
             }
         }
     }

Reply via email to