Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2639#discussion_r194446685
  
    --- Diff: 
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
 ---
    @@ -0,0 +1,803 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import java.io.IOException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsAction;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import 
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping;
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@InputRequirement(Requirement.INPUT_ALLOWED)
    +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
    +@CapabilityDescription("Retrieves a listing of files and directories from 
HDFS. "
    +        + "This processor creates a FlowFile(s) that represents the HDFS 
file/dir with relevant information. "
    +        + "Main purpose of this processor to provide functionality similar 
to HDFS Client, i.e. count, du, ls, test, etc. "
    +        + "Unlike ListHDFS, this processor is stateless, supports incoming 
connections and provides information on a dir level. "
    +        )
    +@WritesAttributes({
    +    @WritesAttribute(attribute="hdfs.objectName", description="The name of 
the file/dir found on HDFS."),
    +    @WritesAttribute(attribute="hdfs.path", description="The path is set 
to the absolute path of the object's parent directory on HDFS. "
    +            + "For example, if an object is a directory 'foo', under 
directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' 
will be '/bar'"),
    +    @WritesAttribute(attribute="hdfs.type", description="The type of an 
object. Possible values: directory, file, link"),
    +    @WritesAttribute(attribute="hdfs.owner", description="The user that 
owns the object in HDFS"),
    +    @WritesAttribute(attribute="hdfs.group", description="The group that 
owns the object in HDFS"),
    +    @WritesAttribute(attribute="hdfs.lastModified", description="The 
timestamp of when the object in HDFS was last modified, as milliseconds since 
midnight Jan 1, 1970 UTC"),
    +    @WritesAttribute(attribute="hdfs.length", description=""
    +            + "In case of files: The number of bytes in the file in HDFS.  
"
    +            + "In case of dirs: Retuns storage space consumed by 
directory. "
    +            + ""),
    +    @WritesAttribute(attribute="hdfs.count.files", description="In case of 
type='directory' will represent total count of files under this dir. "
    +            + "Won't be populated to other types of HDFS objects. "),
    +    @WritesAttribute(attribute="hdfs.count.dirs", description="In case of 
type='directory' will represent total count of directories under this dir 
(including itself). "
    +            + "Won't be populated to other types of HDFS objects. "),
    +    @WritesAttribute(attribute="hdfs.replication", description="The number 
of HDFS replicas for the file"),
    +    @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the object in HDFS. This is formatted as 3 characters for the 
owner, "
    +            + "3 for the group, and 3 for other users. For example 
rw-rw-r--"),
    +    @WritesAttribute(attribute="hdfs.status", description="The status 
contains comma separated list of file/dir paths, which couldn't be 
listed/accessed. "
    +            + "Status won't be set if no errors occured."),
    +    @WritesAttribute(attribute="hdfs.full.tree", description="When 
destination is 'attribute', will be populated with full tree of HDFS directory 
in JSON format.")
    +})
    +@SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class})
    +public class GetHDFSFileInfo extends AbstractHadoopProcessor {
    +    static final long LISTING_LAG_NANOS = 
TimeUnit.MILLISECONDS.toNanos(100L);
    +    public static final PropertyDescriptor FULL_PATH = new 
PropertyDescriptor.Builder()
    +            .displayName("Full path")
    +            .name("gethdfsfileinfo-full-path")
    +            .description("A directory to start listing from, or a file's 
full path.")
    +            .required(true)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .defaultValue("")
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
    +            .displayName("Recurse Subdirectories")
    +            .name("gethdfsfileinfo-recurse-subdirs")
    +            .description("Indicates whether to list files from 
subdirectories of the HDFS directory")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DIR_FILTER = new 
PropertyDescriptor.Builder()
    +            .displayName("Directory Filter")
    +            .name("gethdfsfileinfo-dir-filter")
    +            .description("Regex. Only directories whose names match the 
given regular expression will be picked up. If not provided, any filter would 
be apply (performance considerations).")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_FILTER = new 
PropertyDescriptor.Builder()
    +            .displayName("File Filter")
    +            .name("gethdfsfileinfo-file-filter")
    +            .description("Regex. Only files whose names match the given 
regular expression will be picked up. If not provided, any filter would be 
apply (performance considerations).")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_EXCLUDE_FILTER = new 
PropertyDescriptor.Builder()
    +            .displayName("Exclude Files")
    +            .name("gethdfsfileinfo-file-exclude-filter")
    +            .description("Regex. Files whose names match the given regular 
expression will not be picked up. If not provided, any filter won't be apply 
(performance considerations).")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_DOTTED_DIRS = new 
PropertyDescriptor.Builder()
    +            .displayName("Ignore Dotted Directories")
    +            .name("gethdfsfileinfo-ignore-dotted-dirs")
    +            .description("If true, directories whose names begin with a 
dot (\".\") will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_DOTTED_FILES = new 
PropertyDescriptor.Builder()
    +            .displayName("Ignore Dotted Files")
    +            .name("gethdfsfileinfo-ignore-dotted-files")
    +            .description("If true, files whose names begin with a dot 
(\".\") will be ignored")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final AllowableValue GROUP_ALL = new 
AllowableValue("gethdfsfileinfo-group-all", "All",
    +            "Group all results into a single flowfile.");
    +
    +    static final AllowableValue GROUP_PARENT_DIR = new 
AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory",
    +            "Group HDFS objects by their parent directories only. 
Processor will generate flowfile for each directory (if recursive). "
    +            + "If 'Recurse Subdirectories' property set to 'false', then 
will have the same effect as 'All'");
    +
    +    static final AllowableValue GROUP_NONE = new 
AllowableValue("gethdfsfileinfo-group-none", "None",
    +            "Don't group results. Generate flowfile per each HDFS 
object.");
    +
    +    public static final PropertyDescriptor GROUPING = new 
PropertyDescriptor.Builder()
    +            .displayName("Group Results")
    +            .name("gethdfsfileinfo-group")
    +            .description("Groups HDFS objects")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues(GROUP_ALL, GROUP_PARENT_DIR, GROUP_NONE)
    +            .defaultValue(GROUP_ALL.getValue())
    +            .build();
    +
    +    static final AllowableValue DESTINATION_ATTRIBUTES = new 
AllowableValue("gethdfsfileinfo-dest-attr", "Attributes",
    +            "Details of given HDFS object will be stored in attributes of 
flowfile");
    +
    +    static final AllowableValue DESTINATION_CONTENT = new 
AllowableValue("gethdfsfileinfo-dest-content", "Content",
    +            "Details of given HDFS object will be stored in a content in 
JSON format");
    +
    +    public static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
    +            .displayName("Destination")
    +            .name("gethdfsfileinfo-destination")
    +            .description("Sets the destination for the resutls. When set 
to 'Content', attributes of flowfile won't be used for storing results. ")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
    +            .defaultValue(DESTINATION_CONTENT.getValue())
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("All successfully generated FlowFiles are 
transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_NOT_FOUND = new 
Relationship.Builder()
    +            .name("not found")
    +            .description("If no objects are found, original FlowFile are 
transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder()
    +            .name("original")
    +            .description("Original FlowFiles are transferred to this 
relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("All failed attempts to access HDFS will be 
routed to this relationship")
    +            .build();
    +
    +    private HDFSFileInfoRequest req;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        super.init(context);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(FULL_PATH);
    +        props.add(RECURSE_SUBDIRS);
    +        props.add(DIR_FILTER);
    +        props.add(FILE_FILTER);
    +        props.add(FILE_EXCLUDE_FILTER);
    +        props.add(IGNORE_DOTTED_DIRS);
    +        props.add(IGNORE_DOTTED_FILES);
    +        props.add(GROUPING);
    +        props.add(DESTINATION);
    +        return props;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_NOT_FOUND);
    +        relationships.add(REL_ORIGINAL);
    +        relationships.add(REL_FAILURE);
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext context) {
    +        return super.customValidate(context);
    +    }
    +
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
    +        super.onPropertyModified(descriptor, oldValue, newValue);
    +        // drop request details to rebuild it
    +        req = null;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile ff = null;
    +        if (context.hasIncomingConnection()) {
    +            ff = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
    +            // However, if we have no FlowFile and we have connections 
coming from other Processors, then
    +            // we know that we should run only if we have a FlowFile.
    +            if (ff == null && context.hasNonLoopConnection()) {
    +                context.yield();
    +                return;
    +            }
    +        }
    +        boolean scheduledFF = false;
    +        if (ff == null) {
    +            ff = session.create();
    +            scheduledFF = true;
    +        }
    +        try {
    +            if (req == null) {
    +                //rebuild the request details based on
    +                req = buildRequestDetails(context, session, ff);
    +            }else {
    +                //avoid rebuilding req object's patterns in order to have 
better performance
    +                req = updateRequestDetails(context, session, ff);
    +            }
    +        }catch(Exception e) {
    +            getLogger().error("Invalid properties: ", e);
    +            if (scheduledFF) {
    +                session.remove(ff);
    +                context.yield();
    +            }else {
    +                ff = session.penalize(ff);
    +                session.rollback();
    +            }
    +            return;
    +        }
    +        try {
    +            final FileSystem hdfs = getFileSystem();
    +            UserGroupInformation ugi = getUserGroupInformation();
    +            HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff, 
hdfs, ugi, req, null, false);
    +            if (res == null) {
    +                session.transfer(ff, REL_NOT_FOUND);
    --- End diff --
    
    In the case where there are no incoming connections, the "ff" flow file has 
no info about what was not found, should we add an attribute with the path that 
it was looking at?


---

Reply via email to