[ https://issues.apache.org/jira/browse/NIFI-4906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508396#comment-16508396 ]
ASF GitHub Bot commented on NIFI-4906: -------------------------------------- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2639#discussion_r194430127 --- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java --- @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files and directories from HDFS. " + + "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. " + + "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. " + + "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. " + ) +@WritesAttributes({ + @WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."), + @WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. " + + "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"), + @WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"), + @WritesAttribute(attribute="hdfs.owner", description="The user that owns the object in HDFS"), + @WritesAttribute(attribute="hdfs.group", description="The group that owns the object in HDFS"), + @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), + @WritesAttribute(attribute="hdfs.length", description="" + + "In case of files: The number of bytes in the file in HDFS. " + + "In case of dirs: Retuns storage space consumed by directory. " + + ""), + @WritesAttribute(attribute="hdfs.count.files", description="In case of type='directory' will represent total count of files under this dir. " + + "Won't be populated to other types of HDFS objects. "), + @WritesAttribute(attribute="hdfs.count.dirs", description="In case of type='directory' will represent total count of directories under this dir (including itself). " + + "Won't be populated to other types of HDFS objects. "), + @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for the file"), + @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the object in HDFS. This is formatted as 3 characters for the owner, " + + "3 for the group, and 3 for other users. For example rw-rw-r--"), + @WritesAttribute(attribute="hdfs.status", description="The status contains comma separated list of file/dir paths, which couldn't be listed/accessed. " + + "Status won't be set if no errors occured."), + @WritesAttribute(attribute="hdfs.full.tree", description="When destination is 'attribute', will be populated with full tree of HDFS directory in JSON format.") +}) +@SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class}) +public class GetHDFSFileInfo extends AbstractHadoopProcessor { + static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L); + public static final PropertyDescriptor FULL_PATH = new PropertyDescriptor.Builder() + .displayName("Full path") + .name("gethdfsfileinfo-full-path") + .description("A directory to start listing from, or a file's full path.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() + .displayName("Recurse Subdirectories") + .name("gethdfsfileinfo-recurse-subdirs") + .description("Indicates whether to list files from subdirectories of the HDFS directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor DIR_FILTER = new PropertyDescriptor.Builder() + .displayName("Directory Filter") + .name("gethdfsfileinfo-dir-filter") + .description("Regex. Only directories whose names match the given regular expression will be picked up. If not provided, any filter would be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .displayName("File Filter") + .name("gethdfsfileinfo-file-filter") + .description("Regex. Only files whose names match the given regular expression will be picked up. If not provided, any filter would be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor FILE_EXCLUDE_FILTER = new PropertyDescriptor.Builder() + .displayName("Exclude Files") + .name("gethdfsfileinfo-file-exclude-filter") + .description("Regex. Files whose names match the given regular expression will not be picked up. If not provided, any filter won't be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor IGNORE_DOTTED_DIRS = new PropertyDescriptor.Builder() + .displayName("Ignore Dotted Directories") + .name("gethdfsfileinfo-ignore-dotted-dirs") + .description("If true, directories whose names begin with a dot (\".\") will be ignored") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder() + .displayName("Ignore Dotted Files") + .name("gethdfsfileinfo-ignore-dotted-files") + .description("If true, files whose names begin with a dot (\".\") will be ignored") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + static final AllowableValue GROUP_ALL = new AllowableValue("gethdfsfileinfo-group-all", "All", + "Group all results into a single flowfile."); + + static final AllowableValue GROUP_PARENT_DIR = new AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory", + "Group HDFS objects by their parent directories only. Processor will generate flowfile for each directory (if recursive). " + + "If 'Recurse Subdirectories' property set to 'false', then will have the same effect as 'All'"); + + static final AllowableValue GROUP_NONE = new AllowableValue("gethdfsfileinfo-group-none", "None", + "Don't group results. Generate flowfile per each HDFS object."); + + public static final PropertyDescriptor GROUPING = new PropertyDescriptor.Builder() + .displayName("Group Results") + .name("gethdfsfileinfo-group") + .description("Groups HDFS objects") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(GROUP_ALL, GROUP_PARENT_DIR, GROUP_NONE) + .defaultValue(GROUP_ALL.getValue()) + .build(); + + static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("gethdfsfileinfo-dest-attr", "Attributes", + "Details of given HDFS object will be stored in attributes of flowfile"); + + static final AllowableValue DESTINATION_CONTENT = new AllowableValue("gethdfsfileinfo-dest-content", "Content", + "Details of given HDFS object will be stored in a content in JSON format"); + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .displayName("Destination") + .name("gethdfsfileinfo-destination") + .description("Sets the destination for the resutls. When set to 'Content', attributes of flowfile won't be used for storing results. ") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT) + .defaultValue(DESTINATION_CONTENT.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All successfully generated FlowFiles are transferred to this relationship") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("If no objects are found, original FlowFile are transferred to this relationship") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Original FlowFiles are transferred to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All failed attempts to access HDFS will be routed to this relationship") + .build(); + + private HDFSFileInfoRequest req; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(FULL_PATH); + props.add(RECURSE_SUBDIRS); + props.add(DIR_FILTER); + props.add(FILE_FILTER); + props.add(FILE_EXCLUDE_FILTER); + props.add(IGNORE_DOTTED_DIRS); + props.add(IGNORE_DOTTED_FILES); + props.add(GROUPING); + props.add(DESTINATION); + return props; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_NOT_FOUND); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + return super.customValidate(context); + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + // drop request details to rebuild it + req = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile ff = null; + if (context.hasIncomingConnection()) { + ff = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (ff == null && context.hasNonLoopConnection()) { + context.yield(); + return; + } + } + boolean scheduledFF = false; + if (ff == null) { + ff = session.create(); + scheduledFF = true; + } + try { + if (req == null) { + //rebuild the request details based on + req = buildRequestDetails(context, session, ff); + }else { + //avoid rebuilding req object's patterns in order to have better performance + req = updateRequestDetails(context, session, ff); + } + }catch(Exception e) { + getLogger().error("Invalid properties: ", e); + if (scheduledFF) { + session.remove(ff); + context.yield(); + }else { + ff = session.penalize(ff); + session.rollback(); --- End diff -- If we couldn't build a request for the current flow file because some properties were incorrect, should this be routed to failure instead? or maybe an "invalid" relationship to indicate a difference from other failures? By rolling back we are putting the flow file back in the incoming queue (if we got it from the queue) and I would expect it would keep failing over and over if it didn't work the first time. > Add GetHdfsFileInfo Processor > ----------------------------- > > Key: NIFI-4906 > URL: https://issues.apache.org/jira/browse/NIFI-4906 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Ed Berezitsky > Assignee: Ed Berezitsky > Priority: Major > Labels: patch, pull-request-available > Attachments: NiFi-GetHDFSFileInfo.pdf, gethdfsfileinfo.patch > > > Add *GetHdfsFileInfo* Processor to be able to get stats from a file system. > This processor should support recursive scan, getting information of > directories and files. > _File-level info required_: name, path, length, modified timestamp, last > access timestamp, owner, group, permissions. > _Directory-level info required_: name, path, sum of lengths of files under a > dir, count of files under a dir, modified timestamp, last access timestamp, > owner, group, permissions. > > The result returned: > * in single flow file (in content - a json line per file/dir info); > * flow file per each file/dir info (in content as json obj or in set of > attributes by the choice). -- This message was sent by Atlassian JIRA (v7.6.3#76005)