[
https://issues.apache.org/jira/browse/NIFI-4906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514823#comment-16514823
]
ASF GitHub Bot commented on NIFI-4906:
--------------------------------------
Github user bdesert commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2639#discussion_r195905944
--- Diff:
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
---
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
+@CapabilityDescription("Retrieves a listing of files and directories from
HDFS. "
+ + "This processor creates a FlowFile(s) that represents the HDFS
file/dir with relevant information. "
+ + "Main purpose of this processor to provide functionality similar
to HDFS Client, i.e. count, du, ls, test, etc. "
+ + "Unlike ListHDFS, this processor is stateless, supports incoming
connections and provides information on a dir level. "
+ )
+@WritesAttributes({
+ @WritesAttribute(attribute="hdfs.objectName", description="The name of
the file/dir found on HDFS."),
+ @WritesAttribute(attribute="hdfs.path", description="The path is set
to the absolute path of the object's parent directory on HDFS. "
+ + "For example, if an object is a directory 'foo', under
directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path'
will be '/bar'"),
+ @WritesAttribute(attribute="hdfs.type", description="The type of an
object. Possible values: directory, file, link"),
+ @WritesAttribute(attribute="hdfs.owner", description="The user that
owns the object in HDFS"),
+ @WritesAttribute(attribute="hdfs.group", description="The group that
owns the object in HDFS"),
+ @WritesAttribute(attribute="hdfs.lastModified", description="The
timestamp of when the object in HDFS was last modified, as milliseconds since
midnight Jan 1, 1970 UTC"),
+ @WritesAttribute(attribute="hdfs.length", description=""
+ + "In case of files: The number of bytes in the file in HDFS.
"
+ + "In case of dirs: Retuns storage space consumed by
directory. "
+ + ""),
+ @WritesAttribute(attribute="hdfs.count.files", description="In case of
type='directory' will represent total count of files under this dir. "
+ + "Won't be populated to other types of HDFS objects. "),
+ @WritesAttribute(attribute="hdfs.count.dirs", description="In case of
type='directory' will represent total count of directories under this dir
(including itself). "
+ + "Won't be populated to other types of HDFS objects. "),
+ @WritesAttribute(attribute="hdfs.replication", description="The number
of HDFS replicas for the file"),
+ @WritesAttribute(attribute="hdfs.permissions", description="The
permissions for the object in HDFS. This is formatted as 3 characters for the
owner, "
+ + "3 for the group, and 3 for other users. For example
rw-rw-r--"),
+ @WritesAttribute(attribute="hdfs.status", description="The status
contains comma separated list of file/dir paths, which couldn't be
listed/accessed. "
+ + "Status won't be set if no errors occured."),
+ @WritesAttribute(attribute="hdfs.full.tree", description="When
destination is 'attribute', will be populated with full tree of HDFS directory
in JSON format.")
+})
+@SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class})
+public class GetHDFSFileInfo extends AbstractHadoopProcessor {
+ static final long LISTING_LAG_NANOS =
TimeUnit.MILLISECONDS.toNanos(100L);
+ public static final PropertyDescriptor FULL_PATH = new
PropertyDescriptor.Builder()
+ .displayName("Full path")
+ .name("gethdfsfileinfo-full-path")
+ .description("A directory to start listing from, or a file's
full path.")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RECURSE_SUBDIRS = new
PropertyDescriptor.Builder()
+ .displayName("Recurse Subdirectories")
+ .name("gethdfsfileinfo-recurse-subdirs")
+ .description("Indicates whether to list files from
subdirectories of the HDFS directory")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DIR_FILTER = new
PropertyDescriptor.Builder()
+ .displayName("Directory Filter")
+ .name("gethdfsfileinfo-dir-filter")
+ .description("Regex. Only directories whose names match the
given regular expression will be picked up. If not provided, any filter would
be apply (performance considerations).")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor FILE_FILTER = new
PropertyDescriptor.Builder()
+ .displayName("File Filter")
+ .name("gethdfsfileinfo-file-filter")
+ .description("Regex. Only files whose names match the given
regular expression will be picked up. If not provided, any filter would be
apply (performance considerations).")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor FILE_EXCLUDE_FILTER = new
PropertyDescriptor.Builder()
+ .displayName("Exclude Files")
+ .name("gethdfsfileinfo-file-exclude-filter")
+ .description("Regex. Files whose names match the given regular
expression will not be picked up. If not provided, any filter won't be apply
(performance considerations).")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DOTTED_DIRS = new
PropertyDescriptor.Builder()
+ .displayName("Ignore Dotted Directories")
+ .name("gethdfsfileinfo-ignore-dotted-dirs")
+ .description("If true, directories whose names begin with a
dot (\".\") will be ignored")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DOTTED_FILES = new
PropertyDescriptor.Builder()
+ .displayName("Ignore Dotted Files")
+ .name("gethdfsfileinfo-ignore-dotted-files")
+ .description("If true, files whose names begin with a dot
(\".\") will be ignored")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ static final AllowableValue GROUP_ALL = new
AllowableValue("gethdfsfileinfo-group-all", "All",
+ "Group all results into a single flowfile.");
+
+ static final AllowableValue GROUP_PARENT_DIR = new
AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory",
+ "Group HDFS objects by their parent directories only.
Processor will generate flowfile for each directory (if recursive). "
+ + "If 'Recurse Subdirectories' property set to 'false', then
will have the same effect as 'All'");
+
+ static final AllowableValue GROUP_NONE = new
AllowableValue("gethdfsfileinfo-group-none", "None",
+ "Don't group results. Generate flowfile per each HDFS
object.");
+
+ public static final PropertyDescriptor GROUPING = new
PropertyDescriptor.Builder()
+ .displayName("Group Results")
+ .name("gethdfsfileinfo-group")
+ .description("Groups HDFS objects")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues(GROUP_ALL, GROUP_PARENT_DIR, GROUP_NONE)
+ .defaultValue(GROUP_ALL.getValue())
+ .build();
+
+ static final AllowableValue DESTINATION_ATTRIBUTES = new
AllowableValue("gethdfsfileinfo-dest-attr", "Attributes",
+ "Details of given HDFS object will be stored in attributes of
flowfile");
+
+ static final AllowableValue DESTINATION_CONTENT = new
AllowableValue("gethdfsfileinfo-dest-content", "Content",
+ "Details of given HDFS object will be stored in a content in
JSON format");
+
+ public static final PropertyDescriptor DESTINATION = new
PropertyDescriptor.Builder()
+ .displayName("Destination")
+ .name("gethdfsfileinfo-destination")
+ .description("Sets the destination for the resutls. When set
to 'Content', attributes of flowfile won't be used for storing results. ")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
+ .defaultValue(DESTINATION_CONTENT.getValue())
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("All successfully generated FlowFiles are
transferred to this relationship")
+ .build();
+
+ public static final Relationship REL_NOT_FOUND = new
Relationship.Builder()
+ .name("not found")
+ .description("If no objects are found, original FlowFile are
transferred to this relationship")
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder()
+ .name("original")
+ .description("Original FlowFiles are transferred to this
relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("All failed attempts to access HDFS will be
routed to this relationship")
+ .build();
+
+ private HDFSFileInfoRequest req;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> props = new ArrayList<>(properties);
+ props.add(FULL_PATH);
+ props.add(RECURSE_SUBDIRS);
+ props.add(DIR_FILTER);
+ props.add(FILE_FILTER);
+ props.add(FILE_EXCLUDE_FILTER);
+ props.add(IGNORE_DOTTED_DIRS);
+ props.add(IGNORE_DOTTED_FILES);
+ props.add(GROUPING);
+ props.add(DESTINATION);
+ return props;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_NOT_FOUND);
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext context) {
+ return super.customValidate(context);
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ // drop request details to rebuild it
+ req = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile ff = null;
+ if (context.hasIncomingConnection()) {
+ ff = session.get();
+
+ // If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections
coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (ff == null && context.hasNonLoopConnection()) {
+ context.yield();
+ return;
+ }
+ }
+ boolean scheduledFF = false;
+ if (ff == null) {
+ ff = session.create();
+ scheduledFF = true;
+ }
+ try {
+ if (req == null) {
+ //rebuild the request details based on
+ req = buildRequestDetails(context, session, ff);
+ }else {
+ //avoid rebuilding req object's patterns in order to have
better performance
+ req = updateRequestDetails(context, session, ff);
+ }
+ }catch(Exception e) {
+ getLogger().error("Invalid properties: ", e);
+ if (scheduledFF) {
+ session.remove(ff);
+ context.yield();
+ }else {
+ ff = session.penalize(ff);
+ session.rollback();
+ }
+ return;
+ }
+ try {
+ final FileSystem hdfs = getFileSystem();
+ UserGroupInformation ugi = getUserGroupInformation();
+ HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff,
hdfs, ugi, req, null, false);
+ if (res == null) {
+ session.transfer(ff, REL_NOT_FOUND);
+ return;
+ }
+ session.transfer(ff, REL_ORIGINAL);
+ } catch (final IOException | IllegalArgumentException e) {
+ getLogger().error("Failed to perform listing of HDFS due to
{}", new Object[] {e});
+ ff = session.putAttribute(ff, "hdfs.status", "Failed due to: "
+ e);
+ session.transfer(ff, REL_FAILURE);
+ return;
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ getLogger().error("Interrupted while performing listing of
HDFS", e);
+ ff = session.putAttribute(ff, "hdfs.status", "Failed due to: "
+ e);
+ session.transfer(ff, REL_FAILURE);
+ return;
+ } catch (final Exception e) {
+ getLogger().error("Failed to perform listing of HDFS due to
{}", new Object[] {e});
+ ff = session.putAttribute(ff, "hdfs.status", "Failed due to: "
+ e);
+ session.transfer(ff, REL_FAILURE);
+ return;
+ }
+ }
+
+
+ /*
+ * Walks thru HDFS tree. This method will return null to the main if
there is no provided path existing.
+ */
+ protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext
context, final ProcessSession session, FlowFile origFF, final FileSystem hdfs,
+ final UserGroupInformation ugi, final HDFSFileInfoRequest req,
HDFSObjectInfoDetails parent, final boolean statsOnly) throws Exception{
+
+ final HDFSObjectInfoDetails p = parent;
+
+ if (!ugi.doAs((PrivilegedExceptionAction<Boolean>) () ->
hdfs.exists(p != null ? p.getPath() : new Path(req.fullPath)))) {
+ return null;
+ }
+
+ if (parent == null) {
+ parent = new
HDFSObjectInfoDetails(ugi.doAs((PrivilegedExceptionAction<FileStatus>) () ->
hdfs.getFileStatus(new Path(req.fullPath))));
+ }
+ if (parent.isFile() && p == null) {
+ //single file path requested and found, lets send to output:
+ processHDFSObject(context, session, origFF, req, parent, true);
+ return parent;
+ }
+
+ final Path path = parent.getPath();
+
+ FileStatus[] listFSt = null;
+ try {
+ listFSt = ugi.doAs((PrivilegedExceptionAction<FileStatus[]>)
() -> hdfs.listStatus(path));
+ }catch (IOException e) {
+ parent.error = "Couldn't list directory: " + e;
+ processHDFSObject(context, session, origFF, req, parent, p ==
null);
+ return parent; //File not found exception, or access denied -
don't interrupt, just don't list
+ }
+ if (listFSt != null) {
+ for (FileStatus f : listFSt) {
+ HDFSObjectInfoDetails o = new HDFSObjectInfoDetails(f);
+ HDFSObjectInfoDetails vo = validateMatchingPatterns(o,
req);
+ if (o.isDirectory() && !o.isSymlink() && req.isRecursive) {
+ o = walkHDFSTree(context, session, origFF, hdfs, ugi,
req, o, vo == null || statsOnly);
+ parent.countDirs += o.countDirs;
+ parent.totalLen += o.totalLen;
+ parent.countFiles += o.countFiles;
+ }else if (o.isDirectory() && o.isSymlink()) {
+ parent.countDirs += 1;
+ }else if (o.isFile() && !o.isSymlink()) {
+ parent.countFiles += 1;
+ parent.totalLen += o.getLen();
+ }else if (o.isFile() && o.isSymlink()) {
+ parent.countFiles += 1; // do not add length of the
symlink, as it doesn't consume space under THIS directory, but count files, as
it is still an object.
+ }
+
+ // Decide what to do with child: if requested FF per
object or per dir - just emit new FF with info in 'o' object
+ if (vo != null && !statsOnly) {
+ parent.addChild(vo);
+ if (p != null && req.isRecursive
+ && vo.isFile() && !vo.isSymlink()) {
+ processHDFSObject(context, session, origFF, req,
vo, false);
+ }
+ }
+ }
+ if (!statsOnly) {
+ processHDFSObject(context, session, origFF, req, parent,
p==null);
+ }
+ if (req.groupping != Groupping.ALL) {
+ parent.setChildren(null); //we need children in full tree
only when single output requested.
+ }
+ }
+
+ return parent;
+ }
+
+ protected HDFSObjectInfoDetails validateMatchingPatterns(final
HDFSObjectInfoDetails o, HDFSFileInfoRequest req) {
+ if (o == null || o.getPath() == null) {
+ return null;
+ }
+
+ if (o.isFile()) {
+ if (req.isIgnoreDotFiles &&
o.getPath().getName().startsWith(".")) {
+ return null;
+ }else if (req.fileExcludeFilter != null &&
req.fileExcludeFilter.matcher(o.getPath().getName()).matches()) {
+ return null;
+ }else if (req.fileFilter == null) {
+ return o;
+ }else if (req.fileFilter != null &&
req.fileFilter.matcher(o.getPath().getName()).matches()) {
+ return o;
+ }else {
+ return null;
+ }
+ }
+ if (o.isDirectory()) {
+ if (req.isIgnoreDotDirs &&
o.getPath().getName().startsWith(".")) {
+ return null;
+ }else if (req.dirFilter == null) {
+ return o;
+ }else if (req.dirFilter != null &&
req.dirFilter.matcher(o.getPath().getName()).matches()) {
+ return o;
+ }else {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ /*
+ * Checks whether HDFS object should be sent to output.
+ * If it should be sent, new flowfile will be created, its content and
attributes will be populated according to other request params.
+ */
+ protected HDFSObjectInfoDetails processHDFSObject(final ProcessContext
context, final ProcessSession session,
+ FlowFile origFF, final HDFSFileInfoRequest req, final
HDFSObjectInfoDetails o, final boolean isRoot) {
+ if (o.isFile() && req.groupping != Groupping.NONE) {
+ return null; //there is grouping by either root directory or
every directory, no need to print separate files.
+ }
+ if (o.isDirectory() && o.isSymlink() && req.groupping !=
Groupping.NONE) {
+ return null; //ignore symlink dirs an
+ }
+ if (o.isDirectory() && req.groupping == Groupping.ALL && !isRoot) {
+ return null;
+ }
+ FlowFile ff = session.create(origFF);
+ //won't combine conditions for similar actions for better
readability and maintenance.
+ if (o.isFile() && isRoot && req.isDestContent) {
+ ff = session.write(ff, (out) ->
out.write(o.toJsonString().getBytes()));
+ // ------------------------------
+ }else if (o.isFile() && isRoot && !req.isDestContent) {
+ ff = session.putAllAttributes(ff, o.toAttributesMap());
+ // ------------------------------
+ }else if (o.isFile() && req.isDestContent) {
+ ff = session.write(ff, (out) ->
out.write(o.toJsonString().getBytes()));
+ // ------------------------------
+ }else if (o.isFile() && !req.isDestContent) {
+ ff = session.putAllAttributes(ff, o.toAttributesMap());
+ // ------------------------------
+ }else if (o.isDirectory() && o.isSymlink() && req.isDestContent) {
+ ff = session.write(ff, (out) ->
out.write(o.toJsonString().getBytes()));
+ // ------------------------------
+ }else if (o.isDirectory() && o.isSymlink() && !req.isDestContent) {
+ ff = session.putAllAttributes(ff, o.toAttributesMap());
+ // ------------------------------
+ }else if (o.isDirectory() && req.groupping == Groupping.NONE &&
req.isDestContent) {
+ o.setChildren(null);
+ ff = session.write(ff, (out) ->
out.write(o.toJsonString().getBytes()));
+ // ------------------------------
+ }else if (o.isDirectory() && req.groupping == Groupping.NONE &&
!req.isDestContent) {
+ ff = session.putAllAttributes(ff, o.toAttributesMap());
+ // ------------------------------
+ }else if (o.isDirectory() && req.groupping == Groupping.DIR &&
req.isDestContent) {
+ ff = session.write(ff, (out) ->
out.write(o.toJsonString().getBytes()));
+ // ------------------------------
+ }else if (o.isDirectory() && req.groupping == Groupping.DIR &&
!req.isDestContent) {
+ ff = session.putAllAttributes(ff, o.toAttributesMap());
+ ff = session.putAttribute(ff, "hdfs.full.tree",
o.toJsonString());
+ // ------------------------------
+ }else if (o.isDirectory() && req.groupping == Groupping.ALL &&
req.isDestContent) {
+ ff = session.write(ff, (out) ->
out.write(o.toJsonString().getBytes()));
+ // ------------------------------
+ }else if (o.isDirectory() && req.groupping == Groupping.ALL &&
!req.isDestContent) {
+ ff = session.putAllAttributes(ff, o.toAttributesMap());
+ ff = session.putAttribute(ff, "hdfs.full.tree",
o.toJsonString());
+ }else {
+ getLogger().error("Illegal State!");
+ session.remove(ff);
+ return null;
+ }
+
+ session.transfer(ff, REL_SUCCESS);
+ return o;
+ }
+
+ /*
+ * Returns permissions in readable format like rwxr-xr-x (755)
+ */
+ protected String getPerms(final FsPermission permission) {
+
+ final StringBuilder sb = new StringBuilder();
+ for (FsAction action : new FsAction[]{permission.getUserAction(),
permission.getGroupAction(), permission.getOtherAction()}) {
+ if (action.implies(FsAction.READ)) {
+ sb.append("r");
+ } else {
+ sb.append("-");
+ }
+
+ if (action.implies(FsAction.WRITE)) {
+ sb.append("w");
+ } else {
+ sb.append("-");
+ }
+
+ if (action.implies(FsAction.EXECUTE)) {
+ sb.append("x");
+ } else {
+ sb.append("-");
+ }
+ }
+
+ return sb.toString();
+ }
+
+ /*
+ * Creates internal request object and initialize the fields that
won't be changed every call (onTrigger).
+ * Dynamic fields will be updated per each call separately.
+ */
+ protected HDFSFileInfoRequest buildRequestDetails(ProcessContext
context, ProcessSession session, FlowFile ff) {
+ HDFSFileInfoRequest req = new HDFSFileInfoRequest();
+ req.fullPath =
context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue();
+ req.isRecursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+
+ PropertyValue pv = null;
+ String v = null;
+
+ if (context.getProperty(DIR_FILTER).isSet() &&
(pv=context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff))!=null) {
+ v = pv.getValue();
+ req.dirFilter = v == null ? null : Pattern.compile(v);
+ }
+
+ if (context.getProperty(FILE_FILTER).isSet() &&
(pv=context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff))!=null) {
+ v = pv.getValue();
+ req.fileFilter = v == null ? null : Pattern.compile(v);
+ }
+
+ if (context.getProperty(FILE_EXCLUDE_FILTER).isSet() &&
(pv=context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff))!=null)
{
+ v = pv.getValue();
+ req.fileExcludeFilter = v == null ? null : Pattern.compile(v);
+ }
+
+ req.isIgnoreDotFiles =
context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
+ req.isIgnoreDotDirs =
context.getProperty(IGNORE_DOTTED_DIRS).asBoolean();
+
+ req.groupping =
HDFSFileInfoRequest.Groupping.getEnum(context.getProperty(GROUPING).getValue());
+
+ v = context.getProperty(DESTINATION).getValue();
+ if (DESTINATION_CONTENT.getValue().equals(v)) {
+ req.isDestContent = true;
+ }else {
+ req.isDestContent = false;
+ }
+
+ return req;
+ }
+
+ /*
+ * Creates internal request object if not created previously, and
updates it with dynamic property every time onTrigger is called.
+ * Avoids creating regex Patter objects unless their actual value are
changed due to evaluation of EL
+ */
+ protected HDFSFileInfoRequest updateRequestDetails(ProcessContext
context, ProcessSession session, FlowFile ff) {
+
+ if (req == null) {
+ return buildRequestDetails(context, session, ff);
+ }
+ req.fullPath =
context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue();
+
+ String currValue = null;
+ String oldValue = null;
+
+ currValue =
context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff).getValue();
+ oldValue = req.dirFilter == null ? null : req.dirFilter.toString();
+ if (StringUtils.compare(currValue, oldValue) != 0) {
+ req.dirFilter = currValue == null ? null :
Pattern.compile(currValue);
+ }
+
+
+ currValue =
context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff).getValue();
+ oldValue = req.fileFilter == null ? null :
req.fileFilter.toString();
+ if (StringUtils.compare(currValue, oldValue) != 0) {
+ req.fileFilter = currValue == null ? null :
Pattern.compile(currValue);
+ }
+
+
+ currValue =
context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff).getValue();
+ oldValue = req.fileExcludeFilter == null ? null :
req.fileExcludeFilter.toString();
+ if (StringUtils.compare(currValue, oldValue) != 0) {
+ req.fileExcludeFilter = currValue == null ? null :
Pattern.compile(currValue);
+ }
+
+ return req;
+ }
+
+ /*
+ * Keeps all request details in single object.
+ */
+ static class HDFSFileInfoRequest{
+ enum Groupping {
+ ALL(GROUP_ALL.getValue()),
+ DIR(GROUP_PARENT_DIR.getValue()),
+ NONE(GROUP_NONE.getValue());
+
+ private String val;
+
+ Groupping(String val){
+ this.val = val;
+ }
+
+ public String toString() {
+ return this.val;
+ }
+
+ public static Groupping getEnum(String value) {
+ for (Groupping v : values()) {
+ if (v.val.equals(value)) {
+ return v;
+ }
+ }
+ return null;
+ }
+ }
+
+ String fullPath;
+ boolean isRecursive;
+ Pattern dirFilter;
+ Pattern fileFilter;
+ Pattern fileExcludeFilter;
+ boolean isIgnoreDotFiles;
+ boolean isIgnoreDotDirs;
+ boolean isDestContent;
+ Groupping groupping;
+ }
+
+ /*
+ * Keeps details of HDFS objects.
+ * This class is based on FileStatus and adds additional
feature/properties for count, total size of directories, and subtrees/hierarchy
of recursive listings.
+ */
+ class HDFSObjectInfoDetails extends FileStatus{
+
+ private long countFiles;
+ private long countDirs = 1;
+ private long totalLen;
+ private Collection<HDFSObjectInfoDetails> children = new
LinkedList<>();
+ private String error;
+
+ HDFSObjectInfoDetails(FileStatus fs) throws IOException{
+ super(fs);
+ }
+
+ public long getCountFiles() {
+ return countFiles;
+ }
+
+ public void setCountFiles(long countFiles) {
+ this.countFiles = countFiles;
+ }
+
+ public long getCountDirs() {
+ return countDirs;
+ }
+
+ public void setCountDirs(long countDirs) {
+ this.countDirs = countDirs;
+ }
+
+ public long getTotalLen() {
+ return totalLen;
+ }
+
+ public void setTotalLen(long totalLen) {
+ this.totalLen = totalLen;
+ }
+
+ public Collection<HDFSObjectInfoDetails> getChildren() {
+ return children;
+ }
+
+ public String getError() {
+ return error;
+ }
+
+ public void setError(String error) {
+ this.error = error;
+ }
+
+ public void setChildren(Collection<HDFSObjectInfoDetails>
children) {
+ this.children = children;
+ }
+
+ public void addChild(HDFSObjectInfoDetails child) {
+ this.children.add(child);
+ }
+
+ public void updateTotals(boolean deepUpdate) {
+ if (deepUpdate) {
+ this.countDirs = 1;
+ this.countFiles = 0;
+ this.totalLen = 0;
+ }
+
+ for(HDFSObjectInfoDetails c : children) {
+ if (c.isSymlink()) {
+ continue; //do not count symlinks. they either will be
counted under their actual directories, or won't be count if actual location is
not under provided root for scan.
+ }else if (c.isDirectory()) {
+ if (deepUpdate) {
+ c.updateTotals(deepUpdate);
+ }
+ this.totalLen += c.totalLen;
+ this.countDirs += c.countDirs;
+ this.countFiles += c.countFiles;
+ }else if (c.isFile()) {
+ this.totalLen += c.getLen();
+ this.countFiles++;
+ }
+ }
+
+ }
+
+ /*
+ * Since, by definition, FF will keep only attributes for
parent/single object, we don't need to recurse the children
+ */
+ public Map<String, String> toAttributesMap(){
--- End diff --
toAttributesMap isn't used when destination is content (toJsonString is
being used instead). But I do agree on this miss - going to add it under
processHDFSObject when I create new FF.
> Add GetHdfsFileInfo Processor
> -----------------------------
>
> Key: NIFI-4906
> URL: https://issues.apache.org/jira/browse/NIFI-4906
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Ed Berezitsky
> Assignee: Ed Berezitsky
> Priority: Major
> Labels: patch, pull-request-available
> Attachments: NiFi-GetHDFSFileInfo.pdf, gethdfsfileinfo.patch
>
>
> Add *GetHdfsFileInfo* Processor to be able to get stats from a file system.
> This processor should support recursive scan, getting information of
> directories and files.
> _File-level info required_: name, path, length, modified timestamp, last
> access timestamp, owner, group, permissions.
> _Directory-level info required_: name, path, sum of lengths of files under a
> dir, count of files under a dir, modified timestamp, last access timestamp,
> owner, group, permissions.
>
> The result returned:
> * in single flow file (in content - a json line per file/dir info);
> * flow file per each file/dir info (in content as json obj or in set of
> attributes by the choice).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)