Repository: nifi Updated Branches: refs/heads/master 8f37ad451 -> 556f309df
http://git-wip-us.apache.org/repos/asf/nifi/blob/556f309d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 97039e2..08a8ce2 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Restricted; @@ -46,6 +47,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -112,9 +114,10 @@ public class FetchHDFS extends AbstractHadoopProcessor { } final FileSystem hdfs = getFileSystem(); + final UserGroupInformation ugi = getUserGroupInformation(); final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue(); - Path path = null; + final Path path; try { path = new Path(filenameValue); } catch (IllegalArgumentException e) { @@ -125,54 +128,64 @@ public class FetchHDFS extends AbstractHadoopProcessor { return; } - InputStream stream = null; - CompressionCodec codec = null; - Configuration conf = getConfiguration(); - final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf); - final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString()); - final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; - - if(inferCompressionCodec) { - codec = compressionCodecFactory.getCodec(path); - } else if (compressionType != CompressionType.NONE) { - codec = getCompressionCodec(context, getConfiguration()); - } - final URI uri = path.toUri(); final StopWatch stopWatch = new StopWatch(true); - try { - - final String outputFilename; - final String originalFilename = path.getName(); - stream = hdfs.open(path, 16384); - - // Check if compression codec is defined (inferred or otherwise) - if (codec != null) { - stream = codec.createInputStream(stream); - outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension()); - } else { - outputFilename = originalFilename; + final FlowFile finalFlowFile = flowFile; + + ugi.doAs(new PrivilegedAction<Object>() { + @Override + public Object run() { + InputStream stream = null; + CompressionCodec codec = null; + Configuration conf = getConfiguration(); + final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf); + final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString()); + final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; + + if(inferCompressionCodec) { + codec = compressionCodecFactory.getCodec(path); + } else if (compressionType != CompressionType.NONE) { + codec = getCompressionCodec(context, getConfiguration()); + } + + FlowFile flowFile = finalFlowFile; + try { + final String outputFilename; + final String originalFilename = path.getName(); + stream = hdfs.open(path, 16384); + + // Check if compression codec is defined (inferred or otherwise) + if (codec != null) { + stream = codec.createInputStream(stream); + outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension()); + } else { + outputFilename = originalFilename; + } + + flowFile = session.importFrom(stream, finalFlowFile); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); + + stopWatch.stop(); + getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); + session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final FileNotFoundException | AccessControlException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e}); + flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } catch (final IOException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_COMMS_FAILURE); + } finally { + IOUtils.closeQuietly(stream); + } + + return null; } + }); - flowFile = session.importFrom(stream, flowFile); - flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); - - stopWatch.stop(); - getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); - session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } catch (final FileNotFoundException | AccessControlException e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e}); - flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } catch (final IOException e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_COMMS_FAILURE); - } finally { - IOUtils.closeQuietly(stream); - } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/556f309d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java index aa49390..815b855 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java @@ -22,6 +22,7 @@ import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.util.CapturingLogger; import org.apache.nifi.util.NiFiProperties; @@ -151,7 +152,7 @@ public class MonitorMemoryTest { final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, addProps); // build the system bundle - final Bundle bundle = ExtensionManager.createSystemBundle(nifiProperties); + final Bundle bundle = SystemBundle.create(nifiProperties); ExtensionManager.discoverExtensions(bundle, Collections.emptySet()); return new Tuple<>(FlowController.createStandaloneInstance(
