Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2639#discussion_r194446685 --- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java --- @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files and directories from HDFS. " + + "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. " + + "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. " + + "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. " + ) +@WritesAttributes({ + @WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."), + @WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. " + + "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"), + @WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"), + @WritesAttribute(attribute="hdfs.owner", description="The user that owns the object in HDFS"), + @WritesAttribute(attribute="hdfs.group", description="The group that owns the object in HDFS"), + @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), + @WritesAttribute(attribute="hdfs.length", description="" + + "In case of files: The number of bytes in the file in HDFS. " + + "In case of dirs: Retuns storage space consumed by directory. " + + ""), + @WritesAttribute(attribute="hdfs.count.files", description="In case of type='directory' will represent total count of files under this dir. " + + "Won't be populated to other types of HDFS objects. "), + @WritesAttribute(attribute="hdfs.count.dirs", description="In case of type='directory' will represent total count of directories under this dir (including itself). " + + "Won't be populated to other types of HDFS objects. "), + @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for the file"), + @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the object in HDFS. This is formatted as 3 characters for the owner, " + + "3 for the group, and 3 for other users. For example rw-rw-r--"), + @WritesAttribute(attribute="hdfs.status", description="The status contains comma separated list of file/dir paths, which couldn't be listed/accessed. " + + "Status won't be set if no errors occured."), + @WritesAttribute(attribute="hdfs.full.tree", description="When destination is 'attribute', will be populated with full tree of HDFS directory in JSON format.") +}) +@SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class}) +public class GetHDFSFileInfo extends AbstractHadoopProcessor { + static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L); + public static final PropertyDescriptor FULL_PATH = new PropertyDescriptor.Builder() + .displayName("Full path") + .name("gethdfsfileinfo-full-path") + .description("A directory to start listing from, or a file's full path.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() + .displayName("Recurse Subdirectories") + .name("gethdfsfileinfo-recurse-subdirs") + .description("Indicates whether to list files from subdirectories of the HDFS directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor DIR_FILTER = new PropertyDescriptor.Builder() + .displayName("Directory Filter") + .name("gethdfsfileinfo-dir-filter") + .description("Regex. Only directories whose names match the given regular expression will be picked up. If not provided, any filter would be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .displayName("File Filter") + .name("gethdfsfileinfo-file-filter") + .description("Regex. Only files whose names match the given regular expression will be picked up. If not provided, any filter would be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor FILE_EXCLUDE_FILTER = new PropertyDescriptor.Builder() + .displayName("Exclude Files") + .name("gethdfsfileinfo-file-exclude-filter") + .description("Regex. Files whose names match the given regular expression will not be picked up. If not provided, any filter won't be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor IGNORE_DOTTED_DIRS = new PropertyDescriptor.Builder() + .displayName("Ignore Dotted Directories") + .name("gethdfsfileinfo-ignore-dotted-dirs") + .description("If true, directories whose names begin with a dot (\".\") will be ignored") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder() + .displayName("Ignore Dotted Files") + .name("gethdfsfileinfo-ignore-dotted-files") + .description("If true, files whose names begin with a dot (\".\") will be ignored") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + static final AllowableValue GROUP_ALL = new AllowableValue("gethdfsfileinfo-group-all", "All", + "Group all results into a single flowfile."); + + static final AllowableValue GROUP_PARENT_DIR = new AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory", + "Group HDFS objects by their parent directories only. Processor will generate flowfile for each directory (if recursive). " + + "If 'Recurse Subdirectories' property set to 'false', then will have the same effect as 'All'"); + + static final AllowableValue GROUP_NONE = new AllowableValue("gethdfsfileinfo-group-none", "None", + "Don't group results. Generate flowfile per each HDFS object."); + + public static final PropertyDescriptor GROUPING = new PropertyDescriptor.Builder() + .displayName("Group Results") + .name("gethdfsfileinfo-group") + .description("Groups HDFS objects") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(GROUP_ALL, GROUP_PARENT_DIR, GROUP_NONE) + .defaultValue(GROUP_ALL.getValue()) + .build(); + + static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("gethdfsfileinfo-dest-attr", "Attributes", + "Details of given HDFS object will be stored in attributes of flowfile"); + + static final AllowableValue DESTINATION_CONTENT = new AllowableValue("gethdfsfileinfo-dest-content", "Content", + "Details of given HDFS object will be stored in a content in JSON format"); + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .displayName("Destination") + .name("gethdfsfileinfo-destination") + .description("Sets the destination for the resutls. When set to 'Content', attributes of flowfile won't be used for storing results. ") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT) + .defaultValue(DESTINATION_CONTENT.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All successfully generated FlowFiles are transferred to this relationship") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("If no objects are found, original FlowFile are transferred to this relationship") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Original FlowFiles are transferred to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All failed attempts to access HDFS will be routed to this relationship") + .build(); + + private HDFSFileInfoRequest req; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(FULL_PATH); + props.add(RECURSE_SUBDIRS); + props.add(DIR_FILTER); + props.add(FILE_FILTER); + props.add(FILE_EXCLUDE_FILTER); + props.add(IGNORE_DOTTED_DIRS); + props.add(IGNORE_DOTTED_FILES); + props.add(GROUPING); + props.add(DESTINATION); + return props; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_NOT_FOUND); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + return super.customValidate(context); + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + // drop request details to rebuild it + req = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile ff = null; + if (context.hasIncomingConnection()) { + ff = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (ff == null && context.hasNonLoopConnection()) { + context.yield(); + return; + } + } + boolean scheduledFF = false; + if (ff == null) { + ff = session.create(); + scheduledFF = true; + } + try { + if (req == null) { + //rebuild the request details based on + req = buildRequestDetails(context, session, ff); + }else { + //avoid rebuilding req object's patterns in order to have better performance + req = updateRequestDetails(context, session, ff); + } + }catch(Exception e) { + getLogger().error("Invalid properties: ", e); + if (scheduledFF) { + session.remove(ff); + context.yield(); + }else { + ff = session.penalize(ff); + session.rollback(); + } + return; + } + try { + final FileSystem hdfs = getFileSystem(); + UserGroupInformation ugi = getUserGroupInformation(); + HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff, hdfs, ugi, req, null, false); + if (res == null) { + session.transfer(ff, REL_NOT_FOUND); --- End diff -- In the case where there are no incoming connections, the "ff" flow file has no info about what was not found, should we add an attribute with the path that it was looking at?
---