[ 
https://issues.apache.org/jira/browse/NIFI-4906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508398#comment-16508398
 ] 

ASF GitHub Bot commented on NIFI-4906:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2639#discussion_r194437894
  
    --- Diff: 
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
 ---
    @@ -0,0 +1,803 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import java.io.IOException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsAction;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import 
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_ALLOWED)
    +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files and directories from 
HDFS. "
    +        + "This processor creates a FlowFile(s) that represents the HDFS 
file/dir with relevant information. "
    +        + "Main purpose of this processor to provide functionality similar 
to HDFS Client, i.e. count, du, ls, test, etc. "
    +        + "Unlike ListHDFS, this processor is stateless, supports incoming 
connections and provides information on a dir level. "
    +        )
    +@WritesAttributes({
    +    @WritesAttribute(attribute="hdfs.objectName", description="The name of 
the file/dir found on HDFS."),
    +    @WritesAttribute(attribute="hdfs.path", description="The path is set 
to the absolute path of the object's parent directory on HDFS. "
    +            + "For example, if an object is a directory 'foo', under 
directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' 
will be '/bar'"),
    +    @WritesAttribute(attribute="hdfs.type", description="The type of an 
object. Possible values: directory, file, link"),
    +    @WritesAttribute(attribute="hdfs.owner", description="The user that 
owns the object in HDFS"),
    +    @WritesAttribute(attribute="hdfs.group", description="The group that 
owns the object in HDFS"),
    +    @WritesAttribute(attribute="hdfs.lastModified", description="The 
timestamp of when the object in HDFS was last modified, as milliseconds since 
midnight Jan 1, 1970 UTC"),
    +    @WritesAttribute(attribute="hdfs.length", description=""
    +            + "In case of files: The number of bytes in the file in HDFS.  
"
    +            + "In case of dirs: Retuns storage space consumed by 
directory. "
    +            + ""),
    +    @WritesAttribute(attribute="hdfs.count.files", description="In case of 
type='directory' will represent total count of files under this dir. "
    +            + "Won't be populated to other types of HDFS objects. "),
    +    @WritesAttribute(attribute="hdfs.count.dirs", description="In case of 
type='directory' will represent total count of directories under this dir 
(including itself). "
    +            + "Won't be populated to other types of HDFS objects. "),
    +    @WritesAttribute(attribute="hdfs.replication", description="The number 
of HDFS replicas for the file"),
    +    @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the object in HDFS. This is formatted as 3 characters for the 
owner, "
    +            + "3 for the group, and 3 for other users. For example 
rw-rw-r--"),
    +    @WritesAttribute(attribute="hdfs.status", description="The status 
contains comma separated list of file/dir paths, which couldn't be 
listed/accessed. "
    +            + "Status won't be set if no errors occured."),
    +    @WritesAttribute(attribute="hdfs.full.tree", description="When 
destination is 'attribute', will be populated with full tree of HDFS directory 
in JSON format.")
    +})
    +@SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class})
    +public class GetHDFSFileInfo extends AbstractHadoopProcessor {
    +    static final long LISTING_LAG_NANOS = 
TimeUnit.MILLISECONDS.toNanos(100L);
    +    public static final PropertyDescriptor FULL_PATH = new 
PropertyDescriptor.Builder()
    +            .displayName("Full path")
    +            .name("gethdfsfileinfo-full-path")
    +            .description("A directory to start listing from, or a file's 
full path.")
    +            .required(true)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .defaultValue("")
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
    +            .displayName("Recurse Subdirectories")
    +            .name("gethdfsfileinfo-recurse-subdirs")
    +            .description("Indicates whether to list files from 
subdirectories of the HDFS directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DIR_FILTER = new 
PropertyDescriptor.Builder()
    +            .displayName("Directory Filter")
    +            .name("gethdfsfileinfo-dir-filter")
    +            .description("Regex. Only directories whose names match the 
given regular expression will be picked up. If not provided, any filter would 
be apply (performance considerations).")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new 
PropertyDescriptor.Builder()
    +            .displayName("File Filter")
    +            .name("gethdfsfileinfo-file-filter")
    +            .description("Regex. Only files whose names match the given 
regular expression will be picked up. If not provided, any filter would be 
apply (performance considerations).")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_EXCLUDE_FILTER = new 
PropertyDescriptor.Builder()
    +            .displayName("Exclude Files")
    +            .name("gethdfsfileinfo-file-exclude-filter")
    +            .description("Regex. Files whose names match the given regular 
expression will not be picked up. If not provided, any filter won't be apply 
(performance considerations).")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_DOTTED_DIRS = new 
PropertyDescriptor.Builder()
    +            .displayName("Ignore Dotted Directories")
    +            .name("gethdfsfileinfo-ignore-dotted-dirs")
    +            .description("If true, directories whose names begin with a 
dot (\".\") will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_DOTTED_FILES = new 
PropertyDescriptor.Builder()
    +            .displayName("Ignore Dotted Files")
    +            .name("gethdfsfileinfo-ignore-dotted-files")
    +            .description("If true, files whose names begin with a dot 
(\".\") will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final AllowableValue GROUP_ALL = new 
AllowableValue("gethdfsfileinfo-group-all", "All",
    +            "Group all results into a single flowfile.");
    +
    +    static final AllowableValue GROUP_PARENT_DIR = new 
AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory",
    +            "Group HDFS objects by their parent directories only. 
Processor will generate flowfile for each directory (if recursive). "
    +            + "If 'Recurse Subdirectories' property set to 'false', then 
will have the same effect as 'All'");
    +
    +    static final AllowableValue GROUP_NONE = new 
AllowableValue("gethdfsfileinfo-group-none", "None",
    +            "Don't group results. Generate flowfile per each HDFS 
object.");
    +
    +    public static final PropertyDescriptor GROUPING = new 
PropertyDescriptor.Builder()
    +            .displayName("Group Results")
    +            .name("gethdfsfileinfo-group")
    +            .description("Groups HDFS objects")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues(GROUP_ALL, GROUP_PARENT_DIR, GROUP_NONE)
    +            .defaultValue(GROUP_ALL.getValue())
    +            .build();
    +
    +    static final AllowableValue DESTINATION_ATTRIBUTES = new 
AllowableValue("gethdfsfileinfo-dest-attr", "Attributes",
    +            "Details of given HDFS object will be stored in attributes of 
flowfile");
    +
    +    static final AllowableValue DESTINATION_CONTENT = new 
AllowableValue("gethdfsfileinfo-dest-content", "Content",
    +            "Details of given HDFS object will be stored in a content in 
JSON format");
    +
    +    public static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
    +            .displayName("Destination")
    +            .name("gethdfsfileinfo-destination")
    +            .description("Sets the destination for the resutls. When set 
to 'Content', attributes of flowfile won't be used for storing results. ")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
    +            .defaultValue(DESTINATION_CONTENT.getValue())
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("All successfully generated FlowFiles are 
transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_NOT_FOUND = new 
Relationship.Builder()
    +            .name("not found")
    +            .description("If no objects are found, original FlowFile are 
transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder()
    +            .name("original")
    +            .description("Original FlowFiles are transferred to this 
relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("All failed attempts to access HDFS will be 
routed to this relationship")
    +            .build();
    +
    +    private HDFSFileInfoRequest req;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(FULL_PATH);
    +        props.add(RECURSE_SUBDIRS);
    +        props.add(DIR_FILTER);
    +        props.add(FILE_FILTER);
    +        props.add(FILE_EXCLUDE_FILTER);
    +        props.add(IGNORE_DOTTED_DIRS);
    +        props.add(IGNORE_DOTTED_FILES);
    +        props.add(GROUPING);
    +        props.add(DESTINATION);
    +        return props;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_NOT_FOUND);
    +        relationships.add(REL_ORIGINAL);
    +        relationships.add(REL_FAILURE);
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext context) {
    +        return super.customValidate(context);
    +    }
    +
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
    +        super.onPropertyModified(descriptor, oldValue, newValue);
    +        // drop request details to rebuild it
    +        req = null;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile ff = null;
    +        if (context.hasIncomingConnection()) {
    +            ff = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
    +            // However, if we have no FlowFile and we have connections 
coming from other Processors, then
    +            // we know that we should run only if we have a FlowFile.
    +            if (ff == null && context.hasNonLoopConnection()) {
    +                context.yield();
    +                return;
    +            }
    +        }
    +        boolean scheduledFF = false;
    +        if (ff == null) {
    +            ff = session.create();
    +            scheduledFF = true;
    +        }
    +        try {
    +            if (req == null) {
    +                //rebuild the request details based on
    +                req = buildRequestDetails(context, session, ff);
    +            }else {
    +                //avoid rebuilding req object's patterns in order to have 
better performance
    +                req = updateRequestDetails(context, session, ff);
    +            }
    +        }catch(Exception e) {
    +            getLogger().error("Invalid properties: ", e);
    +            if (scheduledFF) {
    +                session.remove(ff);
    +                context.yield();
    +            }else {
    +                ff = session.penalize(ff);
    +                session.rollback();
    +            }
    +            return;
    +        }
    +        try {
    +            final FileSystem hdfs = getFileSystem();
    +            UserGroupInformation ugi = getUserGroupInformation();
    +            HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff, 
hdfs, ugi, req, null, false);
    +            if (res == null) {
    +                session.transfer(ff, REL_NOT_FOUND);
    +                return;
    +            }
    +            session.transfer(ff, REL_ORIGINAL);
    +        } catch (final IOException | IllegalArgumentException e) {
    +            getLogger().error("Failed to perform listing of HDFS due to 
{}", new Object[] {e});
    +            ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " 
+ e);
    +            session.transfer(ff, REL_FAILURE);
    +            return;
    +        } catch (final InterruptedException e) {
    +            Thread.currentThread().interrupt();
    +            getLogger().error("Interrupted while performing listing of 
HDFS", e);
    +            ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " 
+ e);
    +            session.transfer(ff, REL_FAILURE);
    +            return;
    +        } catch (final Exception e) {
    +            getLogger().error("Failed to perform listing of HDFS due to 
{}", new Object[] {e});
    +            ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " 
+ e);
    +            session.transfer(ff, REL_FAILURE);
    +            return;
    +        }
    +    }
    +
    +
    +    /*
    +     * Walks thru HDFS tree. This method will return null to the main if 
there is no provided path existing.
    +     */
    +    protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext 
context, final ProcessSession session, FlowFile origFF, final FileSystem hdfs,
    +            final UserGroupInformation ugi, final HDFSFileInfoRequest req, 
HDFSObjectInfoDetails parent, final boolean statsOnly) throws Exception{
    +
    +        final HDFSObjectInfoDetails p = parent;
    +
    +        if (!ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> 
hdfs.exists(p != null ? p.getPath() : new Path(req.fullPath)))) {
    +                return null;
    +        }
    +
    +        if (parent == null) {
    +            parent = new 
HDFSObjectInfoDetails(ugi.doAs((PrivilegedExceptionAction<FileStatus>) () -> 
hdfs.getFileStatus(new Path(req.fullPath))));
    +        }
    +        if (parent.isFile() && p == null) {
    +            //single file path requested and found, lets send to output:
    +            processHDFSObject(context, session, origFF, req, parent, true);
    +            return parent;
    +        }
    +
    +        final Path path = parent.getPath();
    +
    +        FileStatus[] listFSt = null;
    +        try {
    +            listFSt = ugi.doAs((PrivilegedExceptionAction<FileStatus[]>) 
() -> hdfs.listStatus(path));
    +        }catch (IOException e) {
    +            parent.error = "Couldn't list directory: " + e;
    +            processHDFSObject(context, session, origFF, req, parent, p == 
null);
    +            return parent; //File not found exception, or access denied - 
don't interrupt, just don't list
    +        }
    +        if (listFSt != null) {
    +            for (FileStatus f : listFSt) {
    +                HDFSObjectInfoDetails o = new HDFSObjectInfoDetails(f);
    +                HDFSObjectInfoDetails vo = validateMatchingPatterns(o, 
req);
    +                if (o.isDirectory() && !o.isSymlink() && req.isRecursive) {
    +                    o = walkHDFSTree(context, session, origFF, hdfs, ugi, 
req, o, vo == null || statsOnly);
    +                    parent.countDirs += o.countDirs;
    +                    parent.totalLen += o.totalLen;
    +                    parent.countFiles += o.countFiles;
    +                }else if (o.isDirectory() && o.isSymlink()) {
    +                    parent.countDirs += 1;
    +                }else if (o.isFile() && !o.isSymlink()) {
    +                    parent.countFiles += 1;
    +                    parent.totalLen += o.getLen();
    +                }else if (o.isFile() && o.isSymlink()) {
    +                    parent.countFiles += 1; // do not add length of the 
symlink, as it doesn't consume space under THIS directory, but count files, as 
it is still an object.
    +                }
    +
    +                // Decide what to do with child: if requested FF per 
object or per dir - just emit new FF with info in 'o' object
    +                if (vo != null && !statsOnly) {
    +                    parent.addChild(vo);
    +                    if (p != null && req.isRecursive
    +                            && vo.isFile() && !vo.isSymlink()) {
    +                        processHDFSObject(context, session, origFF, req, 
vo, false);
    +                    }
    +                }
    +            }
    +            if (!statsOnly) {
    +                processHDFSObject(context, session, origFF, req, parent, 
p==null);
    +            }
    +            if (req.groupping != Groupping.ALL) {
    +                parent.setChildren(null); //we need children in full tree 
only when single output requested.
    +            }
    +        }
    +
    +        return parent;
    +    }
    +
    +    protected HDFSObjectInfoDetails validateMatchingPatterns(final 
HDFSObjectInfoDetails o, HDFSFileInfoRequest req) {
    +        if (o == null || o.getPath() == null) {
    +            return null;
    +        }
    +
    +        if (o.isFile()) {
    +            if (req.isIgnoreDotFiles && 
o.getPath().getName().startsWith(".")) {
    +                return null;
    +            }else if (req.fileExcludeFilter != null && 
req.fileExcludeFilter.matcher(o.getPath().getName()).matches()) {
    +                return null;
    +            }else if (req.fileFilter == null) {
    +                return o;
    +            }else if (req.fileFilter != null && 
req.fileFilter.matcher(o.getPath().getName()).matches()) {
    +                return o;
    +            }else {
    +                return null;
    +            }
    +        }
    +        if (o.isDirectory()) {
    +            if (req.isIgnoreDotDirs && 
o.getPath().getName().startsWith(".")) {
    +                return null;
    +            }else if (req.dirFilter == null) {
    +                return o;
    +            }else if (req.dirFilter != null && 
req.dirFilter.matcher(o.getPath().getName()).matches()) {
    +                return o;
    +            }else {
    +                return null;
    +            }
    +        }
    +        return null;
    +    }
    +
    +    /*
    +     * Checks whether HDFS object should be sent to output.
    +     * If it should be sent, new flowfile will be created, its content and 
attributes will be populated according to other request params.
    +     */
    +    protected HDFSObjectInfoDetails processHDFSObject(final ProcessContext 
context, final ProcessSession session,
    +                    FlowFile origFF, final HDFSFileInfoRequest req, final 
HDFSObjectInfoDetails o, final boolean isRoot) {
    +        if (o.isFile() && req.groupping != Groupping.NONE) {
    +            return null; //there is grouping by either root directory or 
every directory, no need to print separate files.
    +        }
    +        if (o.isDirectory() && o.isSymlink() && req.groupping != 
Groupping.NONE) {
    +            return null; //ignore symlink dirs an
    +        }
    +        if (o.isDirectory() && req.groupping == Groupping.ALL && !isRoot) {
    +            return null;
    +        }
    +        FlowFile ff = session.create(origFF);
    +        //won't combine conditions for similar actions for better 
readability and maintenance.
    +        if (o.isFile() && isRoot &&  req.isDestContent) {
    +            ff = session.write(ff, (out) ->  
out.write(o.toJsonString().getBytes()));
    +            // ------------------------------
    +        }else if (o.isFile() && isRoot &&  !req.isDestContent) {
    +            ff = session.putAllAttributes(ff, o.toAttributesMap());
    +            // ------------------------------
    +        }else if (o.isFile() && req.isDestContent) {
    +            ff = session.write(ff, (out) -> 
out.write(o.toJsonString().getBytes()));
    +            // ------------------------------
    +        }else if (o.isFile() && !req.isDestContent) {
    +            ff = session.putAllAttributes(ff, o.toAttributesMap());
    +            // ------------------------------
    +        }else if (o.isDirectory() && o.isSymlink() && req.isDestContent) {
    +            ff = session.write(ff, (out) -> 
out.write(o.toJsonString().getBytes()));
    +            // ------------------------------
    +        }else if (o.isDirectory() && o.isSymlink() && !req.isDestContent) {
    +            ff = session.putAllAttributes(ff, o.toAttributesMap());
    +            // ------------------------------
    +        }else if (o.isDirectory() && req.groupping == Groupping.NONE && 
req.isDestContent) {
    +            o.setChildren(null);
    +            ff = session.write(ff, (out) -> 
out.write(o.toJsonString().getBytes()));
    +            // ------------------------------
    +        }else if (o.isDirectory() && req.groupping == Groupping.NONE && 
!req.isDestContent) {
    +            ff = session.putAllAttributes(ff, o.toAttributesMap());
    +            // ------------------------------
    +        }else if (o.isDirectory() && req.groupping == Groupping.DIR && 
req.isDestContent) {
    +            ff = session.write(ff, (out) -> 
out.write(o.toJsonString().getBytes()));
    +            // ------------------------------
    +        }else if (o.isDirectory() && req.groupping == Groupping.DIR && 
!req.isDestContent) {
    +            ff = session.putAllAttributes(ff, o.toAttributesMap());
    +            ff = session.putAttribute(ff, "hdfs.full.tree", 
o.toJsonString());
    +            // ------------------------------
    +        }else if (o.isDirectory() && req.groupping == Groupping.ALL && 
req.isDestContent) {
    +            ff = session.write(ff, (out) -> 
out.write(o.toJsonString().getBytes()));
    +            // ------------------------------
    +        }else if (o.isDirectory() && req.groupping == Groupping.ALL && 
!req.isDestContent) {
    +            ff = session.putAllAttributes(ff, o.toAttributesMap());
    +            ff = session.putAttribute(ff, "hdfs.full.tree", 
o.toJsonString());
    +        }else {
    +            getLogger().error("Illegal State!");
    +            session.remove(ff);
    +            return null;
    +        }
    +
    +        session.transfer(ff, REL_SUCCESS);
    +        return o;
    +    }
    +
    +    /*
    +     * Returns permissions in readable format like rwxr-xr-x (755)
    +     */
    +    protected String getPerms(final FsPermission permission) {
    +
    +        final StringBuilder sb = new StringBuilder();
    +        for (FsAction action : new FsAction[]{permission.getUserAction(), 
permission.getGroupAction(), permission.getOtherAction()}) {
    +            if (action.implies(FsAction.READ)) {
    +                sb.append("r");
    +            } else {
    +                sb.append("-");
    +            }
    +
    +            if (action.implies(FsAction.WRITE)) {
    +                sb.append("w");
    +            } else {
    +                sb.append("-");
    +            }
    +
    +            if (action.implies(FsAction.EXECUTE)) {
    +                sb.append("x");
    +            } else {
    +                sb.append("-");
    +            }
    +        }
    +
    +        return sb.toString();
    +    }
    +
    +    /*
    +     * Creates internal request object and initialize the fields that 
won't be changed every call (onTrigger).
    +     * Dynamic fields will be updated per each call separately.
    +     */
    +    protected HDFSFileInfoRequest buildRequestDetails(ProcessContext 
context, ProcessSession session, FlowFile ff) {
    +        HDFSFileInfoRequest req = new HDFSFileInfoRequest();
    +        req.fullPath = 
context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue();
    +        req.isRecursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
    +
    +        PropertyValue pv = null;
    +        String v = null;
    +
    +        if (context.getProperty(DIR_FILTER).isSet() && 
(pv=context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff))!=null) {
    +            v = pv.getValue();
    +            req.dirFilter = v == null ? null : Pattern.compile(v);
    +        }
    +
    +        if (context.getProperty(FILE_FILTER).isSet() && 
(pv=context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff))!=null) {
    +            v = pv.getValue();
    +            req.fileFilter = v == null ? null : Pattern.compile(v);
    +        }
    +
    +        if (context.getProperty(FILE_EXCLUDE_FILTER).isSet() && 
(pv=context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff))!=null)
 {
    +            v = pv.getValue();
    +            req.fileExcludeFilter = v == null ? null : Pattern.compile(v);
    +        }
    +
    +        req.isIgnoreDotFiles = 
context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
    +        req.isIgnoreDotDirs = 
context.getProperty(IGNORE_DOTTED_DIRS).asBoolean();
    +
    +        req.groupping = 
HDFSFileInfoRequest.Groupping.getEnum(context.getProperty(GROUPING).getValue());
    +
    +        v = context.getProperty(DESTINATION).getValue();
    +        if (DESTINATION_CONTENT.getValue().equals(v)) {
    +            req.isDestContent = true;
    +        }else {
    +            req.isDestContent = false;
    +        }
    +
    +        return req;
    +    }
    +
    +    /*
    +     * Creates internal request object if not created previously, and 
updates it with dynamic property every time onTrigger is called.
    +     * Avoids creating regex Patter objects unless their actual value are 
changed due to evaluation of EL
    +     */
    +    protected HDFSFileInfoRequest updateRequestDetails(ProcessContext 
context, ProcessSession session, FlowFile ff) {
    +
    +        if (req == null) {
    +            return buildRequestDetails(context, session, ff);
    +        }
    +        req.fullPath = 
context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue();
    +
    +        String currValue = null;
    +        String oldValue = null;
    +
    +        currValue = 
context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff).getValue();
    +        oldValue = req.dirFilter == null ? null : req.dirFilter.toString();
    +        if (StringUtils.compare(currValue, oldValue) != 0) {
    +            req.dirFilter = currValue == null ? null : 
Pattern.compile(currValue);
    +        }
    +
    +
    +        currValue = 
context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff).getValue();
    +        oldValue = req.fileFilter == null ? null : 
req.fileFilter.toString();
    +        if (StringUtils.compare(currValue, oldValue) != 0) {
    +            req.fileFilter = currValue == null ? null : 
Pattern.compile(currValue);
    +        }
    +
    +
    +        currValue = 
context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff).getValue();
    +        oldValue = req.fileExcludeFilter == null ? null : 
req.fileExcludeFilter.toString();
    +        if (StringUtils.compare(currValue, oldValue) != 0) {
    +            req.fileExcludeFilter = currValue == null ? null : 
Pattern.compile(currValue);
    +        }
    +
    +        return req;
    +    }
    +
    +    /*
    +     * Keeps all request details in single object.
    +     */
    +    static class HDFSFileInfoRequest{
    +        enum Groupping {
    +            ALL(GROUP_ALL.getValue()),
    +            DIR(GROUP_PARENT_DIR.getValue()),
    +            NONE(GROUP_NONE.getValue());
    +
    +            private String val;
    +
    +            Groupping(String val){
    +                this.val = val;
    +            }
    +
    +            public String toString() {
    +                return this.val;
    +            }
    +
    +            public static Groupping getEnum(String value) {
    +                for (Groupping v : values()) {
    +                    if (v.val.equals(value)) {
    +                        return v;
    +                    }
    +                }
    +                return null;
    +            }
    +        }
    +
    +        String fullPath;
    +        boolean isRecursive;
    +        Pattern dirFilter;
    +        Pattern fileFilter;
    +        Pattern fileExcludeFilter;
    +        boolean isIgnoreDotFiles;
    +        boolean isIgnoreDotDirs;
    +        boolean isDestContent;
    +        Groupping groupping;
    +    }
    +
    +    /*
    +     * Keeps details of HDFS objects.
    +     * This class is based on FileStatus and adds additional 
feature/properties for count, total size of directories, and subtrees/hierarchy 
of recursive listings.
    +     */
    +    class HDFSObjectInfoDetails extends FileStatus{
    +
    +        private long countFiles;
    +        private long countDirs = 1;
    +        private long totalLen;
    +        private Collection<HDFSObjectInfoDetails> children = new 
LinkedList<>();
    +        private String error;
    +
    +        HDFSObjectInfoDetails(FileStatus fs) throws IOException{
    +            super(fs);
    +        }
    +
    +        public long getCountFiles() {
    +            return countFiles;
    +        }
    +
    +        public void setCountFiles(long countFiles) {
    +            this.countFiles = countFiles;
    +        }
    +
    +        public long getCountDirs() {
    +            return countDirs;
    +        }
    +
    +        public void setCountDirs(long countDirs) {
    +            this.countDirs = countDirs;
    +        }
    +
    +        public long getTotalLen() {
    +            return totalLen;
    +        }
    +
    +        public void setTotalLen(long totalLen) {
    +            this.totalLen = totalLen;
    +        }
    +
    +        public Collection<HDFSObjectInfoDetails> getChildren() {
    +            return children;
    +        }
    +
    +        public String getError() {
    +            return error;
    +        }
    +
    +        public void setError(String error) {
    +            this.error = error;
    +        }
    +
    +        public void setChildren(Collection<HDFSObjectInfoDetails> 
children) {
    +            this.children = children;
    +        }
    +
    +        public void addChild(HDFSObjectInfoDetails child) {
    +            this.children.add(child);
    +        }
    +
    +        public void updateTotals(boolean deepUpdate) {
    +            if (deepUpdate) {
    +                this.countDirs = 1;
    +                this.countFiles = 0;
    +                this.totalLen = 0;
    +            }
    +
    +            for(HDFSObjectInfoDetails c : children) {
    +                if (c.isSymlink()) {
    +                    continue; //do not count symlinks. they either will be 
counted under their actual directories, or won't be count if actual location is 
not under provided root for scan.
    +                }else if (c.isDirectory()) {
    +                    if (deepUpdate) {
    +                        c.updateTotals(deepUpdate);
    +                    }
    +                    this.totalLen += c.totalLen;
    +                    this.countDirs += c.countDirs;
    +                    this.countFiles += c.countFiles;
    +                }else if (c.isFile()) {
    +                    this.totalLen += c.getLen();
    +                    this.countFiles++;
    +                }
    +            }
    +
    +        }
    +
    +        /*
    +         * Since, by definition, FF will keep only attributes for 
parent/single object, we don't need to recurse the children
    +         */
    +        public Map<String, String> toAttributesMap(){
    --- End diff --
    
    If the destination is content, then we should be setting the mime.type 
attribute to application/json


> Add GetHdfsFileInfo Processor
> -----------------------------
>
>                 Key: NIFI-4906
>                 URL: https://issues.apache.org/jira/browse/NIFI-4906
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Ed Berezitsky
>            Assignee: Ed Berezitsky
>            Priority: Major
>              Labels: patch, pull-request-available
>         Attachments: NiFi-GetHDFSFileInfo.pdf, gethdfsfileinfo.patch
>
>
> Add *GetHdfsFileInfo* Processor to be able to get stats from a file system.
> This processor should support recursive scan, getting information of 
> directories and files.
> _File-level info required_: name, path, length, modified timestamp, last 
> access timestamp, owner, group, permissions.
> _Directory-level info required_: name, path, sum of lengths of files under a 
> dir, count of files under a dir, modified timestamp, last access timestamp, 
> owner, group, permissions.
>  
> The result returned:
>  * in single flow file (in content - a json line per file/dir info);
>  * flow file per each file/dir info (in content as json obj or in set of 
> attributes by the choice).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to