Repository: nifi
Updated Branches:
  refs/heads/master 8f37ad451 -> 556f309df


http://git-wip-us.apache.org/repos/asf/nifi/blob/556f309d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 97039e2..08a8ce2 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Restricted;
@@ -46,6 +47,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -112,9 +114,10 @@ public class FetchHDFS extends AbstractHadoopProcessor {
         }
 
         final FileSystem hdfs = getFileSystem();
+        final UserGroupInformation ugi = getUserGroupInformation();
         final String filenameValue = 
context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
 
-        Path path = null;
+        final Path path;
         try {
             path = new Path(filenameValue);
         } catch (IllegalArgumentException e) {
@@ -125,54 +128,64 @@ public class FetchHDFS extends AbstractHadoopProcessor {
             return;
         }
 
-        InputStream stream = null;
-        CompressionCodec codec = null;
-        Configuration conf = getConfiguration();
-        final CompressionCodecFactory compressionCodecFactory = new 
CompressionCodecFactory(conf);
-        final CompressionType compressionType = 
CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
-        final boolean inferCompressionCodec = compressionType == 
CompressionType.AUTOMATIC;
-
-        if(inferCompressionCodec) {
-            codec = compressionCodecFactory.getCodec(path);
-        } else if (compressionType != CompressionType.NONE) {
-            codec = getCompressionCodec(context, getConfiguration());
-        }
-
         final URI uri = path.toUri();
         final StopWatch stopWatch = new StopWatch(true);
-        try {
-
-            final String outputFilename;
-            final String originalFilename = path.getName();
-            stream = hdfs.open(path, 16384);
-
-            // Check if compression codec is defined (inferred or otherwise)
-            if (codec != null) {
-                stream = codec.createInputStream(stream);
-                outputFilename = StringUtils.removeEnd(originalFilename, 
codec.getDefaultExtension());
-            } else {
-                outputFilename = originalFilename;
+        final FlowFile finalFlowFile = flowFile;
+
+        ugi.doAs(new PrivilegedAction<Object>() {
+            @Override
+            public Object run() {
+                InputStream stream = null;
+                CompressionCodec codec = null;
+                Configuration conf = getConfiguration();
+                final CompressionCodecFactory compressionCodecFactory = new 
CompressionCodecFactory(conf);
+                final CompressionType compressionType = 
CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+                final boolean inferCompressionCodec = compressionType == 
CompressionType.AUTOMATIC;
+
+                if(inferCompressionCodec) {
+                    codec = compressionCodecFactory.getCodec(path);
+                } else if (compressionType != CompressionType.NONE) {
+                    codec = getCompressionCodec(context, getConfiguration());
+                }
+
+                FlowFile flowFile = finalFlowFile;
+                try {
+                    final String outputFilename;
+                    final String originalFilename = path.getName();
+                    stream = hdfs.open(path, 16384);
+
+                    // Check if compression codec is defined (inferred or 
otherwise)
+                    if (codec != null) {
+                        stream = codec.createInputStream(stream);
+                        outputFilename = 
StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
+                    } else {
+                        outputFilename = originalFilename;
+                    }
+
+                    flowFile = session.importFrom(stream, finalFlowFile);
+                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
+
+                    stopWatch.stop();
+                    getLogger().info("Successfully received content from {} 
for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
+                    session.getProvenanceReporter().fetch(flowFile, 
uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+                    session.transfer(flowFile, REL_SUCCESS);
+                } catch (final FileNotFoundException | AccessControlException 
e) {
+                    getLogger().error("Failed to retrieve content from {} for 
{} due to {}; routing to failure", new Object[] {uri, flowFile, e});
+                    flowFile = session.putAttribute(flowFile, 
"hdfs.failure.reason", e.getMessage());
+                    flowFile = session.penalize(flowFile);
+                    session.transfer(flowFile, REL_FAILURE);
+                } catch (final IOException e) {
+                    getLogger().error("Failed to retrieve content from {} for 
{} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
+                    flowFile = session.penalize(flowFile);
+                    session.transfer(flowFile, REL_COMMS_FAILURE);
+                } finally {
+                    IOUtils.closeQuietly(stream);
+                }
+
+                return null;
             }
+        });
 
-            flowFile = session.importFrom(stream, flowFile);
-            flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
-
-            stopWatch.stop();
-            getLogger().info("Successfully received content from {} for {} in 
{}", new Object[] {uri, flowFile, stopWatch.getDuration()});
-            session.getProvenanceReporter().fetch(flowFile, uri.toString(), 
stopWatch.getDuration(TimeUnit.MILLISECONDS));
-            session.transfer(flowFile, REL_SUCCESS);
-        } catch (final FileNotFoundException | AccessControlException e) {
-            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to failure", new Object[] {uri, flowFile, e});
-            flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", 
e.getMessage());
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-        } catch (final IOException e) {
-            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_COMMS_FAILURE);
-        } finally {
-            IOUtils.closeQuietly(stream);
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/556f309d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
index aa49390..815b855 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
@@ -22,6 +22,7 @@ import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.SystemBundle;
 import org.apache.nifi.provenance.MockProvenanceRepository;
 import org.apache.nifi.util.CapturingLogger;
 import org.apache.nifi.util.NiFiProperties;
@@ -151,7 +152,7 @@ public class MonitorMemoryTest {
         final NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties(null, addProps);
 
         // build the system bundle
-        final Bundle bundle = 
ExtensionManager.createSystemBundle(nifiProperties);
+        final Bundle bundle = SystemBundle.create(nifiProperties);
         ExtensionManager.discoverExtensions(bundle, Collections.emptySet());
 
         return new Tuple<>(FlowController.createStandaloneInstance(

Reply via email to