tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1255861882
##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -36,642 +33,306 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.deprecation.log.DeprecationLogger;
-import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.processors.hadoop.util.FilterMode;
+import org.apache.nifi.processors.hadoop.util.writer.FlowFileObjectWriter;
+import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter;
+import org.apache.nifi.processors.hadoop.util.writer.RecordObjectWriter;
import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-import java.io.File;
+
import java.io.IOException;
-import java.io.OutputStream;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source",
"filesystem"})
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
-@CapabilityDescription("Retrieves a listing of files from HDFS. Each time a
listing is performed, the files with the latest timestamp will be excluded "
- + "and picked up during the next execution of the processor. This is
done to ensure that we do not miss any files, or produce duplicates, in the "
- + "cases where files with the same timestamp are written immediately
before and after a single execution of the processor. For each file that is "
- + "listed in HDFS, this processor creates a FlowFile that represents
the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is "
- + "designed to run on Primary Node only in a cluster. If the primary
node changes, the new Primary Node will pick up where the previous node left "
- + "off without duplicating all of the data. Unlike GetHDFS, this
Processor does not delete any data from HDFS.")
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file
that is listed in HDFS, this processor creates a FlowFile that represents "
+ + "the HDFS file to be fetched in conjunction with FetchHDFS. This
Processor is designed to run on Primary Node only in a cluster. If the primary "
+ + "node changes, the new Primary Node will pick up where the previous
node left off without duplicating all of the data. Unlike GetHDFS, this "
+ + "Processor does not delete any data from HDFS.")
@WritesAttributes({
- @WritesAttribute(attribute="filename", description="The name of the file
that was read from HDFS."),
- @WritesAttribute(attribute="path", description="The path is set to the
absolute path of the file's directory on HDFS. For example, if the Directory
property is set to /tmp, "
- + "then files picked up from /tmp will have the path attribute set
to \"./\". If the Recurse Subdirectories property is set to true and a file is
picked up "
- + "from /tmp/abc/1/2/3, then the path attribute will be set to
\"/tmp/abc/1/2/3\"."),
- @WritesAttribute(attribute="hdfs.owner", description="The user that owns
the file in HDFS"),
- @WritesAttribute(attribute="hdfs.group", description="The group that owns
the file in HDFS"),
- @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp
of when the file in HDFS was last modified, as milliseconds since midnight Jan
1, 1970 UTC"),
- @WritesAttribute(attribute="hdfs.length", description="The number of bytes
in the file in HDFS"),
- @WritesAttribute(attribute="hdfs.replication", description="The number of
HDFS replicas for hte file"),
- @WritesAttribute(attribute="hdfs.permissions", description="The
permissions for the file in HDFS. This is formatted as 3 characters for the
owner, "
- + "3 for the group, and 3 for other users. For example rw-rw-r--")
+ @WritesAttribute(attribute = "filename", description = "The name of
the file that was read from HDFS."),
+ @WritesAttribute(attribute = "path", description = "The path is set to
the absolute path of the file's directory on HDFS. For example, if the
Directory property is set to /tmp, "
+ + "then files picked up from /tmp will have the path attribute
set to \"./\". If the Recurse Subdirectories property is set to true and a file
is picked up "
+ + "from /tmp/abc/1/2/3, then the path attribute will be set to
\"/tmp/abc/1/2/3\"."),
+ @WritesAttribute(attribute = "hdfs.owner", description = "The user
that owns the file in HDFS"),
+ @WritesAttribute(attribute = "hdfs.group", description = "The group
that owns the file in HDFS"),
+ @WritesAttribute(attribute = "hdfs.lastModified", description = "The
timestamp of when the file in HDFS was last modified, as milliseconds since
midnight Jan 1, 1970 UTC"),
+ @WritesAttribute(attribute = "hdfs.length", description = "The number
of bytes in the file in HDFS"),
+ @WritesAttribute(attribute = "hdfs.replication", description = "The
number of HDFS replicas for hte file"),
+ @WritesAttribute(attribute = "hdfs.permissions", description = "The
permissions for the file in HDFS. This is formatted as 3 characters for the
owner, "
+ + "3 for the group, and 3 for other users. For example
rw-rw-r--")
})
-@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of
HDFS files, the latest timestamp of all the files listed and the latest "
- + "timestamp of all the files transferred are both stored. This allows
the Processor to list only files that have been added or modified after "
- + "this date the next time that the Processor is run, without having
to store all of the actual filenames/paths which could lead to performance "
- + "problems. State is stored across the cluster so that this Processor
can be run on Primary Node only and if a new Primary "
- + "Node is selected, the new node can pick up where the previous node
left off, without duplicating the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of
HDFS files, the latest timestamp of all the files listed is stored. "
+ + "This allows the Processor to list only files that have been added
or modified after this date the next time that the Processor is run, "
+ + "without having to store all of the actual filenames/paths which
could lead to performance problems. State is stored across the cluster "
+ + "so that this Processor can be run on Primary Node only and if a new
Primary Node is selected, the new node can pick up where the previous "
+ + "node left off, without duplicating the data.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class ListHDFS extends AbstractHadoopProcessor {
- private static final RecordSchema RECORD_SCHEMA;
- private static final String FILENAME = "filename";
- private static final String PATH = "path";
- private static final String IS_DIRECTORY = "directory";
- private static final String SIZE = "size";
- private static final String LAST_MODIFIED = "lastModified";
- private static final String PERMISSIONS = "permissions";
- private static final String OWNER = "owner";
- private static final String GROUP = "group";
- private static final String REPLICATION = "replication";
- private static final String IS_SYM_LINK = "symLink";
- private static final String IS_ENCRYPTED = "encrypted";
- private static final String IS_ERASURE_CODED = "erasureCoded";
-
- static {
- final List<RecordField> recordFields = new ArrayList<>();
- recordFields.add(new RecordField(FILENAME,
RecordFieldType.STRING.getDataType(), false));
- recordFields.add(new RecordField(PATH,
RecordFieldType.STRING.getDataType(), false));
- recordFields.add(new RecordField(IS_DIRECTORY,
RecordFieldType.BOOLEAN.getDataType(), false));
- recordFields.add(new RecordField(SIZE,
RecordFieldType.LONG.getDataType(), false));
- recordFields.add(new RecordField(LAST_MODIFIED,
RecordFieldType.TIMESTAMP.getDataType(), false));
- recordFields.add(new RecordField(PERMISSIONS,
RecordFieldType.STRING.getDataType()));
- recordFields.add(new RecordField(OWNER,
RecordFieldType.STRING.getDataType()));
- recordFields.add(new RecordField(GROUP,
RecordFieldType.STRING.getDataType()));
- recordFields.add(new RecordField(REPLICATION,
RecordFieldType.INT.getDataType()));
- recordFields.add(new RecordField(IS_SYM_LINK,
RecordFieldType.BOOLEAN.getDataType()));
- recordFields.add(new RecordField(IS_ENCRYPTED,
RecordFieldType.BOOLEAN.getDataType()));
- recordFields.add(new RecordField(IS_ERASURE_CODED,
RecordFieldType.BOOLEAN.getDataType()));
- RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
- }
+ private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*";
public static final PropertyDescriptor RECURSE_SUBDIRS = new
PropertyDescriptor.Builder()
- .name("Recurse Subdirectories")
- .description("Indicates whether to list files from subdirectories of
the HDFS directory")
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
+ .name("Recurse Subdirectories")
+ .description("Indicates whether to list files from subdirectories
of the HDFS directory")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
- .name("record-writer")
- .displayName("Record Writer")
- .description("Specifies the Record Writer to use for creating the
listing. If not specified, one FlowFile will be created for each entity that is
listed. If the Record Writer is specified, " +
- "all entities will be written to a single FlowFile.")
- .required(false)
- .identifiesControllerService(RecordSetWriterFactory.class)
- .build();
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Record Writer to use for creating the
listing. If not specified, one FlowFile will be created for each "
+ + "entity that is listed. If the Record Writer is
specified, all entities will be written to a single FlowFile.")
+ .required(false)
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .build();
public static final PropertyDescriptor FILE_FILTER = new
PropertyDescriptor.Builder()
- .name("File Filter")
- .description("Only files whose names match the given regular
expression will be picked up")
- .required(true)
- .defaultValue("[^\\.].*")
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .build();
-
- private static final String FILTER_MODE_DIRECTORIES_AND_FILES =
"filter-mode-directories-and-files";
- private static final String FILTER_MODE_FILES_ONLY =
"filter-mode-files-only";
- private static final String FILTER_MODE_FULL_PATH =
"filter-mode-full-path";
- static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new
AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
- "Directories and Files",
- "Filtering will be applied to the names of directories and files. If
" + RECURSE_SUBDIRS.getDisplayName()
- + " is set to true, only subdirectories with a matching name
will be searched for files that match "
- + "the regular expression defined in " +
FILE_FILTER.getDisplayName() + ".");
- static final AllowableValue FILTER_FILES_ONLY_VALUE = new
AllowableValue(FILTER_MODE_FILES_ONLY,
- "Files Only",
- "Filtering will only be applied to the names of files. If " +
RECURSE_SUBDIRS.getDisplayName()
- + " is set to true, the entire subdirectory tree will be
searched for files that match "
- + "the regular expression defined in " +
FILE_FILTER.getDisplayName() + ".");
- static final AllowableValue FILTER_FULL_PATH_VALUE = new
AllowableValue(FILTER_MODE_FULL_PATH,
- "Full Path",
- "Filtering will be applied by evaluating the regular expression
defined in " + FILE_FILTER.getDisplayName()
- + " against the full path of files with and without the scheme
and authority. If "
- + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the
entire subdirectory tree will be searched for files in which the full path of "
- + "the file matches the regular expression defined in " +
FILE_FILTER.getDisplayName() + ". See 'Additional Details' for more
information.");
+ .name("File Filter")
+ .description("Only files whose names match the given regular
expression will be picked up")
+ .required(true)
+ .defaultValue(NON_HIDDEN_FILES_REGEX)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
public static final PropertyDescriptor FILE_FILTER_MODE = new
PropertyDescriptor.Builder()
- .name("file-filter-mode")
- .displayName("File Filter Mode")
- .description("Determines how the regular expression in " +
FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
- .required(true)
- .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE,
FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
- .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor MIN_AGE = new
PropertyDescriptor.Builder()
- .name("minimum-file-age")
- .displayName("Minimum File Age")
- .description("The minimum age that a file must be in order to be
pulled; any file younger than this "
- + "amount of time (based on last modification date) will be
ignored")
- .required(false)
- .addValidator(StandardValidators.createTimePeriodValidator(0,
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
- .build();
-
- public static final PropertyDescriptor MAX_AGE = new
PropertyDescriptor.Builder()
- .name("maximum-file-age")
- .displayName("Maximum File Age")
- .description("The maximum age that a file must be in order to be
pulled; any file older than this "
- + "amount of time (based on last modification date) will be
ignored. Minimum value is 100ms.")
- .required(false)
- .addValidator(StandardValidators.createTimePeriodValidator(100,
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
- .build();
+ .name("file-filter-mode")
+ .displayName("File Filter Mode")
+ .description("Determines how the regular expression in " +
FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
+ .required(true)
+ .allowableValues(FilterMode.class)
+ .defaultValue(FILTER_DIRECTORIES_AND_FILES.getValue())
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MINIMUM_FILE_AGE = new
PropertyDescriptor.Builder()
+ .name("minimum-file-age")
+ .displayName("Minimum File Age")
+ .description("The minimum age that a file must be in order to be
pulled; any file younger than this "
+ + "amount of time (based on last modification date) will
be ignored")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(0,
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+ .build();
+
+ public static final PropertyDescriptor MAXIMUM_FILE_AGE = new
PropertyDescriptor.Builder()
+ .name("maximum-file-age")
+ .displayName("Maximum File Age")
+ .description("The maximum age that a file must be in order to be
pulled; any file older than this "
+ + "amount of time (based on last modification date) will
be ignored. Minimum value is 100ms.")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(100,
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All FlowFiles are transferred to this relationship")
- .build();
-
- private static final DeprecationLogger deprecationLogger =
DeprecationLoggerFactory.getLogger(ListHDFS.class);
-
- private volatile long latestTimestampListed = -1L;
- private volatile long latestTimestampEmitted = -1L;
- private volatile long lastRunTimestamp = -1L;
- private volatile boolean resetState = false;
- static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
- static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
-
- static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
+ .name("success")
+ .description("All FlowFiles are transferred to this relationship")
+ .build();
+ public static final String LEGACY_EMITTED_TIMESTAMP_KEY =
"emitted.timestamp";
+ public static final String LEGACY_LISTING_TIMESTAMP_KEY =
"listing.timestamp";
+ public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp";
+ public static final String LATEST_FILES_KEY = "latest.file.%d";
+
+ private static final Set<Relationship> RELATIONSHIPS =
Collections.singleton(REL_SUCCESS);
private Pattern fileFilterRegexPattern;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- super.init(context);
- }
+ private volatile boolean resetState = false;
@Override
protected void preProcessConfiguration(Configuration config,
ProcessContext context) {
super.preProcessConfiguration(config, context);
// Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER
regex can be compiled here rather than during onTrigger processing
fileFilterRegexPattern =
Pattern.compile(context.getProperty(FILE_FILTER).getValue());
-
- }
-
- protected File getPersistenceFile() {
- return new File("conf/state/" + getIdentifier());
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> props = new ArrayList<>(properties);
- props.add(DIRECTORY);
- props.add(RECURSE_SUBDIRS);
- props.add(RECORD_WRITER);
- props.add(FILE_FILTER);
- props.add(FILE_FILTER_MODE);
- props.add(MIN_AGE);
- props.add(MAX_AGE);
+ props.addAll(Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER,
FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE));
return props;
}
@Override
public Set<Relationship> getRelationships() {
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- return relationships;
+ return RELATIONSHIPS;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
final List<ValidationResult> problems = new
ArrayList<>(super.customValidate(context));
- final Long minAgeProp =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- final Long maxAgeProp =
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long minAgeProp =
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeProp =
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
if (minimumAge > maximumAge) {
- problems.add(new
ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
- .explanation(MIN_AGE.getDisplayName() + " cannot be
greater than " + MAX_AGE.getDisplayName()).build());
+ problems.add(new
ValidationResult.Builder().valid(false).subject("ListHDFS Configuration")
+ .explanation(MINIMUM_FILE_AGE.getDisplayName() + " cannot
be greater than " + MAXIMUM_FILE_AGE.getDisplayName()).build());
}
-
return problems;
}
- protected String getKey(final String directory) {
- return getIdentifier() + ".lastListingTime." + directory;
- }
-
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) ||
descriptor.equals(FILE_FILTER))) {
- this.resetState = true;
- }
- }
-
- /**
- * Determines which of the given FileStatus's describes a File that should
be listed.
- *
- * @param statuses the eligible FileStatus objects that we could
potentially list
- * @param context processor context with properties values
- * @return a Set containing only those FileStatus objects that we want to
list
- */
- Set<FileStatus> determineListable(final Set<FileStatus> statuses,
ProcessContext context) {
- final long minTimestamp = this.latestTimestampListed;
- final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
- final Long minAgeProp =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- // NIFI-4144 - setting to MIN_VALUE so that in case the file
modification time is in
- // the future relative to the nifi instance, files are not skipped.
- final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE :
minAgeProp;
- final Long maxAgeProp =
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
-
- // Build a sorted map to determine the latest possible entries
- for (final FileStatus status : statuses) {
- if (status.getPath().getName().endsWith("_COPYING_")) {
- continue;
- }
-
- final long fileAge = System.currentTimeMillis() -
status.getModificationTime();
- if (minimumAge > fileAge || fileAge > maximumAge) {
- continue;
- }
-
- final long entityTimestamp = status.getModificationTime();
-
- if (entityTimestamp > latestTimestampListed) {
- latestTimestampListed = entityTimestamp;
- }
-
- // New entries are all those that occur at or after the associated
timestamp
- final boolean newEntry = entityTimestamp >= minTimestamp &&
entityTimestamp > latestTimestampEmitted;
-
- if (newEntry) {
- List<FileStatus> entitiesForTimestamp =
orderedEntries.get(status.getModificationTime());
- if (entitiesForTimestamp == null) {
- entitiesForTimestamp = new ArrayList<FileStatus>();
- orderedEntries.put(status.getModificationTime(),
entitiesForTimestamp);
- }
- entitiesForTimestamp.add(status);
- }
+ resetState = true;
}
-
- final Set<FileStatus> toList = new HashSet<>();
-
- if (orderedEntries.size() > 0) {
- long latestListingTimestamp = orderedEntries.lastKey();
-
- // If the last listing time is equal to the newest entries
previously seen,
- // another iteration has occurred without new files and special
handling is needed to avoid starvation
- if (latestListingTimestamp == minTimestamp) {
- // We are done if the latest listing timestamp is equal to the
last processed time,
- // meaning we handled those items originally passed over
- if (latestListingTimestamp == latestTimestampEmitted) {
- return Collections.emptySet();
- }
- } else {
- // Otherwise, newest entries are held back one cycle to avoid
issues in writes occurring exactly when the listing is being performed to avoid
missing data
- orderedEntries.remove(latestListingTimestamp);
- }
-
- for (List<FileStatus> timestampEntities : orderedEntries.values())
{
- for (FileStatus status : timestampEntities) {
- toList.add(status);
- }
- }
- }
-
- return toList;
}
@OnScheduled
public void resetStateIfNecessary(final ProcessContext context) throws
IOException {
if (resetState) {
- getLogger().debug("Property has been modified. Resetting the state
values - listing.timestamp and emitted.timestamp to -1L");
+ getLogger().debug("Property has been modified. Resetting the state
values.");
context.getStateManager().clear(Scope.CLUSTER);
- this.resetState = false;
+ resetState = false;
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- // We have to ensure that we don't continually perform listings,
because if we perform two listings within
- // the same millisecond, our algorithm for comparing timestamps will
not work. So we ensure here that we do
- // not let that happen.
- final long now = System.nanoTime();
- if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
- lastRunTimestamp = now;
- context.yield();
- return;
- }
- lastRunTimestamp = now;
-
// Ensure that we are using the latest listing information before we
try to perform a listing of HDFS files.
+ final long latestTimestamp;
+ final List<String> latestFiles;
try {
final StateMap stateMap = session.getState(Scope.CLUSTER);
- if (!stateMap.getStateVersion().isPresent()) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
- getLogger().debug("Found no state stored");
+ final String latestTimestampString =
stateMap.get(LATEST_TIMESTAMP_KEY);
+
+ final String legacyLatestListingTimestampString =
stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+ final String legacyLatestEmittedTimestampString =
stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+ if (legacyLatestListingTimestampString != null) {
+ final long legacyLatestListingTimestamp =
Long.parseLong(legacyLatestListingTimestampString);
+ final long legacyLatestEmittedTimestamp =
Long.parseLong(legacyLatestEmittedTimestampString);
+ latestTimestamp = legacyLatestListingTimestamp ==
legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 :
legacyLatestListingTimestamp;
+ latestFiles = new ArrayList<>();
+ getLogger().debug("Transitioned from legacy state to new
state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp':
{}'," +
+ "'latestTimestamp': {}", legacyLatestListingTimestamp,
legacyLatestEmittedTimestamp, latestTimestamp);
+ } else if (latestTimestampString != null) {
+ latestTimestamp = Long.parseLong(latestTimestampString);
+ latestFiles = stateMap.toMap().entrySet().stream()
+ .filter(entry ->
entry.getKey().startsWith("latest.file"))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
} else {
- // Determine if state is stored in the 'new' format or the
'old' format
- final String emittedString =
stateMap.get(EMITTED_TIMESTAMP_KEY);
- if (emittedString == null) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
- getLogger().debug("Found no recognized state keys;
assuming no relevant state and resetting listing/emitted time to -1");
- } else {
- // state is stored in the new format, using just two
timestamps
- latestTimestampEmitted = Long.parseLong(emittedString);
- final String listingTimestmapString =
stateMap.get(LISTING_TIMESTAMP_KEY);
- if (listingTimestmapString != null) {
- latestTimestampListed =
Long.parseLong(listingTimestmapString);
- }
-
- getLogger().debug("Found new-style state stored, latesting
timestamp emitted = {}, latest listed = {}",
- new Object[] {latestTimestampEmitted,
latestTimestampListed});
- }
+ latestTimestamp = 0L;
+ latestFiles = new ArrayList<>();
}
- } catch (final IOException ioe) {
+ } catch (IOException e) {
getLogger().error("Failed to retrieve timestamp of last listing
from the State Manager. Will not perform listing until this is accomplished.");
context.yield();
return;
}
// Pull in any file that is newer than the timestamp that we have.
- final FileSystem hdfs = getFileSystem();
- final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
- String fileFilterMode =
context.getProperty(FILE_FILTER_MODE).getValue();
+ try (final FileSystem hdfs = getFileSystem()) {
+ final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+ final PathFilter pathFilter = createPathFilter(context);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final Set<FileStatus> statuses;
- try {
+ final FileStatusManager fileStatusManager = new
FileStatusManager(latestTimestamp, latestFiles);
final Path rootPath = getNormalizedPath(context, DIRECTORY);
- statuses = getStatuses(rootPath, recursive, hdfs,
createPathFilter(context), fileFilterMode);
- getLogger().debug("Found a total of {} files in HDFS", new
Object[] {statuses.size()});
- } catch (final IOException | IllegalArgumentException e) {
- getLogger().error("Failed to perform listing of HDFS", e);
- return;
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- getLogger().error("Interrupted while performing listing of HDFS",
e);
- return;
- }
+ final FileStatusIterable fileStatuses = new
FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
- final Set<FileStatus> listable = determineListable(statuses, context);
- getLogger().debug("Of the {} files found in HDFS, {} are listable",
new Object[] {statuses.size(), listable.size()});
-
- // Create FlowFile(s) for the listing, if there are any
- if (!listable.isEmpty()) {
- if (context.getProperty(RECORD_WRITER).isSet()) {
- try {
- createRecords(listable, context, session);
- } catch (final IOException | SchemaNotFoundException e) {
- getLogger().error("Failed to write listing of HDFS", e);
- return;
- }
- } else {
- createFlowFiles(listable, session);
- }
- }
-
- for (final FileStatus status : listable) {
- final long fileModTime = status.getModificationTime();
- if (fileModTime > latestTimestampEmitted) {
- latestTimestampEmitted = fileModTime;
- }
- }
-
- final Map<String, String> updatedState = new HashMap<>(1);
- updatedState.put(LISTING_TIMESTAMP_KEY,
String.valueOf(latestTimestampListed));
- updatedState.put(EMITTED_TIMESTAMP_KEY,
String.valueOf(latestTimestampEmitted));
- getLogger().debug("New state map: {}", new Object[] {updatedState});
-
- try {
- session.setState(updatedState, Scope.CLUSTER);
- } catch (final IOException ioe) {
- getLogger().warn("Failed to save cluster-wide state. If NiFi is
restarted, data duplication may occur", ioe);
- }
-
- final int listCount = listable.size();
- if ( listCount > 0 ) {
- getLogger().info("Successfully created listing with {} new files
from HDFS", new Object[] {listCount});
- session.commitAsync();
- } else {
- getLogger().debug("There is no data to list. Yielding.");
- context.yield();
- }
- }
+ final Long minAgeProp =
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE :
minAgeProp;
+ final Long maxAgeProp =
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
- private void createFlowFiles(final Set<FileStatus> fileStatuses, final
ProcessSession session) {
- for (final FileStatus status : fileStatuses) {
- final Map<String, String> attributes = createAttributes(status);
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, getSuccessRelationship());
- }
- }
-
- private void createRecords(final Set<FileStatus> fileStatuses, final
ProcessContext context, final ProcessSession session) throws IOException,
SchemaNotFoundException {
- final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
- FlowFile flowFile = session.create();
- final WriteResult writeResult;
- try (final OutputStream out = session.write(flowFile);
- final RecordSetWriter recordSetWriter =
writerFactory.createWriter(getLogger(), getRecordSchema(), out,
Collections.emptyMap())) {
-
- recordSetWriter.beginRecordSet();
- for (final FileStatus fileStatus : fileStatuses) {
- final Record record = createRecord(fileStatus);
- recordSetWriter.write(record);
+ final HadoopFileStatusWriter writer;
+ if (writerFactory == null) {
+ writer = new FlowFileObjectWriter(session, fileStatuses,
minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp,
latestFiles);
+ } else {
+ writer = new RecordObjectWriter(session, fileStatuses,
minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp,
+ latestFiles, writerFactory, getLogger());
}
- writeResult = recordSetWriter.finishRecordSet();
- }
+ writer.write();
- final Map<String, String> attributes = new
HashMap<>(writeResult.getAttributes());
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- session.transfer(flowFile, getSuccessRelationship());
- }
-
- private Record createRecord(final FileStatus fileStatus) {
- final Map<String, Object> values = new HashMap<>();
- values.put(FILENAME, fileStatus.getPath().getName());
- values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
- values.put(OWNER, fileStatus.getOwner());
- values.put(GROUP, fileStatus.getGroup());
- values.put(LAST_MODIFIED, new
Timestamp(fileStatus.getModificationTime()));
- values.put(SIZE, fileStatus.getLen());
- values.put(REPLICATION, fileStatus.getReplication());
-
- final FsPermission permission = fileStatus.getPermission();
- final String perms = getPerms(permission.getUserAction()) +
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
- values.put(PERMISSIONS, perms);
-
- values.put(IS_DIRECTORY, fileStatus.isDirectory());
- values.put(IS_SYM_LINK, fileStatus.isSymlink());
- values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
- values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
- return new MapRecord(getRecordSchema(), values);
- }
-
- private RecordSchema getRecordSchema() {
- return RECORD_SCHEMA;
- }
+ getLogger().debug("Found a total of {} files in HDFS, {} are
listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
- private Set<FileStatus> getStatuses(final Path path, final boolean
recursive, final FileSystem hdfs, final PathFilter filter, String filterMode)
throws IOException, InterruptedException {
- final Set<FileStatus> statusSet = new HashSet<>();
-
- getLogger().debug("Fetching listing for {}", new Object[] {path});
- final FileStatus[] statuses;
- if (isPostListingFilterNeeded(filterMode)) {
- // For this filter mode, the filter is not passed to listStatus,
so that directory names will not be
- // filtered out when the listing is recursive.
- statuses =
getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () ->
hdfs.listStatus(path));
- } else {
- statuses =
getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () ->
hdfs.listStatus(path, filter));
- }
-
- for ( final FileStatus status : statuses ) {
- if ( status.isDirectory() ) {
- if ( recursive ) {
- try {
- statusSet.addAll(getStatuses(status.getPath(),
recursive, hdfs, filter, filterMode));
- } catch (final IOException ioe) {
- getLogger().error("Failed to retrieve HDFS listing for
subdirectory {} due to {}; will continue listing others", new Object[]
{status.getPath(), ioe});
- }
+ if (writer.getListedFileCount() > 0) {
+ final Map<String, String> updatedState = new HashMap<>();
+ updatedState.put(LATEST_TIMESTAMP_KEY,
String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
+ final List<String> files =
fileStatusManager.getCurrentLatestFiles();
+ for (int i = 0; i < files.size(); i++) {
+ final String currentFilePath = files.get(i);
+ updatedState.put(String.format(LATEST_FILES_KEY, i),
currentFilePath);
}
+ getLogger().debug("New state map: {}", updatedState);
+ updateState(session, updatedState);
+
+ getLogger().info("Successfully created listing with {} new
files from HDFS", writer.getListedFileCount());
} else {
- if (isPostListingFilterNeeded(filterMode)) {
- // Filtering explicitly performed here, since it was not
able to be done when calling listStatus.
- if (filter.accept(status.getPath())) {
- statusSet.add(status);
- }
- } else {
- statusSet.add(status);
- }
+ getLogger().debug("There is no data to list. Yielding.");
+ context.yield();
}
+ } catch (IOException e) {
+ throw new ProcessException("IO error occurred when closing HDFS
file system", e);
}
-
- return statusSet;
- }
-
- /**
- * Determines if filtering needs to be applied, after calling {@link
FileSystem#listStatus(Path)}, based on the
- * given filter mode.
- * Filter modes that need to be able to search directories regardless of
the given filter should return true.
- * FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link
FileSystem#listStatus(Path)} be invoked
- * without a filter so that all directories can be traversed when
filtering with these modes.
- * FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering
can be applied directly with
- * {@link FileSystem#listStatus(Path, PathFilter)} regardless of a
recursive listing.
- * @param filterMode the value of one of the defined AllowableValues
representing filter modes
- * @return true if results need to be filtered, false otherwise
- */
- private boolean isPostListingFilterNeeded(String filterMode) {
- return filterMode.equals(FILTER_MODE_FILES_ONLY) ||
filterMode.equals(FILTER_MODE_FULL_PATH);
- }
-
- private String getAbsolutePath(final Path path) {
- final Path parent = path.getParent();
- final String prefix = (parent == null || parent.getName().equals(""))
? "" : getAbsolutePath(parent);
- return prefix + "/" + path.getName();
- }
-
- private Map<String, String> createAttributes(final FileStatus status) {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(),
status.getPath().getName());
- attributes.put(CoreAttributes.PATH.key(),
getAbsolutePath(status.getPath().getParent()));
-
- attributes.put(getAttributePrefix() + ".owner", status.getOwner());
- attributes.put(getAttributePrefix() + ".group", status.getGroup());
- attributes.put(getAttributePrefix() + ".lastModified",
String.valueOf(status.getModificationTime()));
- attributes.put(getAttributePrefix() + ".length",
String.valueOf(status.getLen()));
- attributes.put(getAttributePrefix() + ".replication",
String.valueOf(status.getReplication()));
-
- final FsPermission permission = status.getPermission();
- final String perms = getPerms(permission.getUserAction()) +
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
- attributes.put(getAttributePrefix() + ".permissions", perms);
- return attributes;
- }
-
- private String getPerms(final FsAction action) {
- final StringBuilder sb = new StringBuilder();
- if (action.implies(FsAction.READ)) {
- sb.append("r");
- } else {
- sb.append("-");
- }
-
- if (action.implies(FsAction.WRITE)) {
- sb.append("w");
- } else {
- sb.append("-");
- }
-
- if (action.implies(FsAction.EXECUTE)) {
- sb.append("x");
- } else {
- sb.append("-");
- }
-
- return sb.toString();
}
private PathFilter createPathFilter(final ProcessContext context) {
- final String filterMode =
context.getProperty(FILE_FILTER_MODE).getValue();
- return path -> {
- final boolean accepted;
- if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
- accepted =
fileFilterRegexPattern.matcher(path.toString()).matches()
+ final FilterMode filterMode =
FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());
+ final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+
+ switch (filterMode) {
+ case FILTER_MODE_FILES_ONLY:
+ return path ->
fileFilterRegexPattern.matcher(path.getName()).matches();
+ case FILTER_MODE_FULL_PATH:
+ return path ->
fileFilterRegexPattern.matcher(path.toString()).matches()
||
fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
- } else {
- accepted =
fileFilterRegexPattern.matcher(path.getName()).matches();
- }
- return accepted;
- };
+ // FILTER_DIRECTORIES_AND_FILES
+ default:
+ return path -> Stream.of(path.toString().split("/"))
Review Comment:
This case is not handling the path properly. It is working on the entire
url. This can lead to dropping files that should be listed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]