[
https://issues.apache.org/jira/browse/NIFI-4906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514838#comment-16514838
]
ASF GitHub Bot commented on NIFI-4906:
--------------------------------------
Github user bdesert commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2639#discussion_r195906907
--- Diff:
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
---
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
+@CapabilityDescription("Retrieves a listing of files and directories from
HDFS. "
+ + "This processor creates a FlowFile(s) that represents the HDFS
file/dir with relevant information. "
+ + "Main purpose of this processor to provide functionality similar
to HDFS Client, i.e. count, du, ls, test, etc. "
+ + "Unlike ListHDFS, this processor is stateless, supports incoming
connections and provides information on a dir level. "
+ )
+@WritesAttributes({
+ @WritesAttribute(attribute="hdfs.objectName", description="The name of
the file/dir found on HDFS."),
+ @WritesAttribute(attribute="hdfs.path", description="The path is set
to the absolute path of the object's parent directory on HDFS. "
+ + "For example, if an object is a directory 'foo', under
directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path'
will be '/bar'"),
+ @WritesAttribute(attribute="hdfs.type", description="The type of an
object. Possible values: directory, file, link"),
+ @WritesAttribute(attribute="hdfs.owner", description="The user that
owns the object in HDFS"),
+ @WritesAttribute(attribute="hdfs.group", description="The group that
owns the object in HDFS"),
+ @WritesAttribute(attribute="hdfs.lastModified", description="The
timestamp of when the object in HDFS was last modified, as milliseconds since
midnight Jan 1, 1970 UTC"),
+ @WritesAttribute(attribute="hdfs.length", description=""
+ + "In case of files: The number of bytes in the file in HDFS.
"
+ + "In case of dirs: Retuns storage space consumed by
directory. "
+ + ""),
+ @WritesAttribute(attribute="hdfs.count.files", description="In case of
type='directory' will represent total count of files under this dir. "
+ + "Won't be populated to other types of HDFS objects. "),
+ @WritesAttribute(attribute="hdfs.count.dirs", description="In case of
type='directory' will represent total count of directories under this dir
(including itself). "
+ + "Won't be populated to other types of HDFS objects. "),
+ @WritesAttribute(attribute="hdfs.replication", description="The number
of HDFS replicas for the file"),
+ @WritesAttribute(attribute="hdfs.permissions", description="The
permissions for the object in HDFS. This is formatted as 3 characters for the
owner, "
+ + "3 for the group, and 3 for other users. For example
rw-rw-r--"),
+ @WritesAttribute(attribute="hdfs.status", description="The status
contains comma separated list of file/dir paths, which couldn't be
listed/accessed. "
+ + "Status won't be set if no errors occured."),
+ @WritesAttribute(attribute="hdfs.full.tree", description="When
destination is 'attribute', will be populated with full tree of HDFS directory
in JSON format.")
+})
+@SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class})
+public class GetHDFSFileInfo extends AbstractHadoopProcessor {
+ static final long LISTING_LAG_NANOS =
TimeUnit.MILLISECONDS.toNanos(100L);
+ public static final PropertyDescriptor FULL_PATH = new
PropertyDescriptor.Builder()
+ .displayName("Full path")
+ .name("gethdfsfileinfo-full-path")
+ .description("A directory to start listing from, or a file's
full path.")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RECURSE_SUBDIRS = new
PropertyDescriptor.Builder()
+ .displayName("Recurse Subdirectories")
+ .name("gethdfsfileinfo-recurse-subdirs")
+ .description("Indicates whether to list files from
subdirectories of the HDFS directory")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DIR_FILTER = new
PropertyDescriptor.Builder()
+ .displayName("Directory Filter")
+ .name("gethdfsfileinfo-dir-filter")
+ .description("Regex. Only directories whose names match the
given regular expression will be picked up. If not provided, any filter would
be apply (performance considerations).")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor FILE_FILTER = new
PropertyDescriptor.Builder()
+ .displayName("File Filter")
+ .name("gethdfsfileinfo-file-filter")
+ .description("Regex. Only files whose names match the given
regular expression will be picked up. If not provided, any filter would be
apply (performance considerations).")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor FILE_EXCLUDE_FILTER = new
PropertyDescriptor.Builder()
+ .displayName("Exclude Files")
+ .name("gethdfsfileinfo-file-exclude-filter")
+ .description("Regex. Files whose names match the given regular
expression will not be picked up. If not provided, any filter won't be apply
(performance considerations).")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DOTTED_DIRS = new
PropertyDescriptor.Builder()
+ .displayName("Ignore Dotted Directories")
+ .name("gethdfsfileinfo-ignore-dotted-dirs")
+ .description("If true, directories whose names begin with a
dot (\".\") will be ignored")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DOTTED_FILES = new
PropertyDescriptor.Builder()
+ .displayName("Ignore Dotted Files")
+ .name("gethdfsfileinfo-ignore-dotted-files")
+ .description("If true, files whose names begin with a dot
(\".\") will be ignored")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ static final AllowableValue GROUP_ALL = new
AllowableValue("gethdfsfileinfo-group-all", "All",
+ "Group all results into a single flowfile.");
+
+ static final AllowableValue GROUP_PARENT_DIR = new
AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory",
+ "Group HDFS objects by their parent directories only.
Processor will generate flowfile for each directory (if recursive). "
+ + "If 'Recurse Subdirectories' property set to 'false', then
will have the same effect as 'All'");
+
+ static final AllowableValue GROUP_NONE = new
AllowableValue("gethdfsfileinfo-group-none", "None",
+ "Don't group results. Generate flowfile per each HDFS
object.");
+
+ public static final PropertyDescriptor GROUPING = new
PropertyDescriptor.Builder()
+ .displayName("Group Results")
+ .name("gethdfsfileinfo-group")
+ .description("Groups HDFS objects")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues(GROUP_ALL, GROUP_PARENT_DIR, GROUP_NONE)
+ .defaultValue(GROUP_ALL.getValue())
+ .build();
+
+ static final AllowableValue DESTINATION_ATTRIBUTES = new
AllowableValue("gethdfsfileinfo-dest-attr", "Attributes",
+ "Details of given HDFS object will be stored in attributes of
flowfile");
+
+ static final AllowableValue DESTINATION_CONTENT = new
AllowableValue("gethdfsfileinfo-dest-content", "Content",
+ "Details of given HDFS object will be stored in a content in
JSON format");
+
+ public static final PropertyDescriptor DESTINATION = new
PropertyDescriptor.Builder()
+ .displayName("Destination")
+ .name("gethdfsfileinfo-destination")
+ .description("Sets the destination for the resutls. When set
to 'Content', attributes of flowfile won't be used for storing results. ")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
+ .defaultValue(DESTINATION_CONTENT.getValue())
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("All successfully generated FlowFiles are
transferred to this relationship")
+ .build();
+
+ public static final Relationship REL_NOT_FOUND = new
Relationship.Builder()
+ .name("not found")
+ .description("If no objects are found, original FlowFile are
transferred to this relationship")
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder()
+ .name("original")
+ .description("Original FlowFiles are transferred to this
relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("All failed attempts to access HDFS will be
routed to this relationship")
+ .build();
+
+ private HDFSFileInfoRequest req;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> props = new ArrayList<>(properties);
+ props.add(FULL_PATH);
+ props.add(RECURSE_SUBDIRS);
+ props.add(DIR_FILTER);
+ props.add(FILE_FILTER);
+ props.add(FILE_EXCLUDE_FILTER);
+ props.add(IGNORE_DOTTED_DIRS);
+ props.add(IGNORE_DOTTED_FILES);
+ props.add(GROUPING);
+ props.add(DESTINATION);
+ return props;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_NOT_FOUND);
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext context) {
+ return super.customValidate(context);
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ // drop request details to rebuild it
+ req = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile ff = null;
+ if (context.hasIncomingConnection()) {
+ ff = session.get();
+
+ // If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections
coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (ff == null && context.hasNonLoopConnection()) {
+ context.yield();
+ return;
+ }
+ }
+ boolean scheduledFF = false;
+ if (ff == null) {
+ ff = session.create();
+ scheduledFF = true;
+ }
+ try {
+ if (req == null) {
+ //rebuild the request details based on
+ req = buildRequestDetails(context, session, ff);
+ }else {
+ //avoid rebuilding req object's patterns in order to have
better performance
+ req = updateRequestDetails(context, session, ff);
+ }
+ }catch(Exception e) {
+ getLogger().error("Invalid properties: ", e);
+ if (scheduledFF) {
+ session.remove(ff);
+ context.yield();
+ }else {
+ ff = session.penalize(ff);
+ session.rollback();
+ }
+ return;
+ }
+ try {
+ final FileSystem hdfs = getFileSystem();
+ UserGroupInformation ugi = getUserGroupInformation();
+ HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff,
hdfs, ugi, req, null, false);
+ if (res == null) {
+ session.transfer(ff, REL_NOT_FOUND);
--- End diff --
@bbende , good catch. although, in such case there will be properties of
the processor used to compute path, and other params, it would still be nice to
have computed values at run time. adding this info now.
> Add GetHdfsFileInfo Processor
> -----------------------------
>
> Key: NIFI-4906
> URL: https://issues.apache.org/jira/browse/NIFI-4906
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Ed Berezitsky
> Assignee: Ed Berezitsky
> Priority: Major
> Labels: patch, pull-request-available
> Attachments: NiFi-GetHDFSFileInfo.pdf, gethdfsfileinfo.patch
>
>
> Add *GetHdfsFileInfo* Processor to be able to get stats from a file system.
> This processor should support recursive scan, getting information of
> directories and files.
> _File-level info required_: name, path, length, modified timestamp, last
> access timestamp, owner, group, permissions.
> _Directory-level info required_: name, path, sum of lengths of files under a
> dir, count of files under a dir, modified timestamp, last access timestamp,
> owner, group, permissions.
>
> The result returned:
> * in single flow file (in content - a json line per file/dir info);
> * flow file per each file/dir info (in content as json obj or in set of
> attributes by the choice).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)