turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226928846
##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -319,131 +240,36 @@ public void onPropertyModified(final PropertyDescriptor
descriptor, final String
}
}
- /**
- * Determines which of the given FileStatus's describes a File that should
be listed.
- *
- * @param statuses the eligible FileStatus objects that we could
potentially list
- * @param context processor context with properties values
- * @return a Set containing only those FileStatus objects that we want to
list
- */
- Set<FileStatus> determineListable(final Set<FileStatus> statuses,
ProcessContext context) {
- final long minTimestamp = this.latestTimestampListed;
- final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
- final Long minAgeProp =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- // NIFI-4144 - setting to MIN_VALUE so that in case the file
modification time is in
- // the future relative to the nifi instance, files are not skipped.
- final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE :
minAgeProp;
- final Long maxAgeProp =
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
-
- // Build a sorted map to determine the latest possible entries
- for (final FileStatus status : statuses) {
- if (status.getPath().getName().endsWith("_COPYING_")) {
- continue;
- }
-
- final long fileAge = System.currentTimeMillis() -
status.getModificationTime();
- if (minimumAge > fileAge || fileAge > maximumAge) {
- continue;
- }
-
- final long entityTimestamp = status.getModificationTime();
-
- if (entityTimestamp > latestTimestampListed) {
- latestTimestampListed = entityTimestamp;
- }
-
- // New entries are all those that occur at or after the associated
timestamp
- final boolean newEntry = entityTimestamp >= minTimestamp &&
entityTimestamp > latestTimestampEmitted;
-
- if (newEntry) {
- List<FileStatus> entitiesForTimestamp =
orderedEntries.get(status.getModificationTime());
- if (entitiesForTimestamp == null) {
- entitiesForTimestamp = new ArrayList<FileStatus>();
- orderedEntries.put(status.getModificationTime(),
entitiesForTimestamp);
- }
- entitiesForTimestamp.add(status);
- }
- }
-
- final Set<FileStatus> toList = new HashSet<>();
-
- if (orderedEntries.size() > 0) {
- long latestListingTimestamp = orderedEntries.lastKey();
-
- // If the last listing time is equal to the newest entries
previously seen,
- // another iteration has occurred without new files and special
handling is needed to avoid starvation
- if (latestListingTimestamp == minTimestamp) {
- // We are done if the latest listing timestamp is equal to the
last processed time,
- // meaning we handled those items originally passed over
- if (latestListingTimestamp == latestTimestampEmitted) {
- return Collections.emptySet();
- }
- } else {
- // Otherwise, newest entries are held back one cycle to avoid
issues in writes occurring exactly when the listing is being performed to avoid
missing data
- orderedEntries.remove(latestListingTimestamp);
- }
-
- for (List<FileStatus> timestampEntities : orderedEntries.values())
{
- for (FileStatus status : timestampEntities) {
- toList.add(status);
- }
- }
- }
-
- return toList;
- }
-
@OnScheduled
public void resetStateIfNecessary(final ProcessContext context) throws
IOException {
if (resetState) {
- getLogger().debug("Property has been modified. Resetting the state
values - listing.timestamp and emitted.timestamp to -1L");
+ getLogger().debug("Property has been modified. Resetting the state
values.");
context.getStateManager().clear(Scope.CLUSTER);
this.resetState = false;
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- // We have to ensure that we don't continually perform listings,
because if we perform two listings within
- // the same millisecond, our algorithm for comparing timestamps will
not work. So we ensure here that we do
- // not let that happen.
- final long now = System.nanoTime();
- if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
- lastRunTimestamp = now;
- context.yield();
- return;
- }
- lastRunTimestamp = now;
-
// Ensure that we are using the latest listing information before we
try to perform a listing of HDFS files.
try {
- final StateMap stateMap = session.getState(Scope.CLUSTER);
- if (!stateMap.getStateVersion().isPresent()) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
- getLogger().debug("Found no state stored");
- } else {
- // Determine if state is stored in the 'new' format or the
'old' format
- final String emittedString =
stateMap.get(EMITTED_TIMESTAMP_KEY);
- if (emittedString == null) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
- getLogger().debug("Found no recognized state keys;
assuming no relevant state and resetting listing/emitted time to -1");
- } else {
- // state is stored in the new format, using just two
timestamps
- latestTimestampEmitted = Long.parseLong(emittedString);
- final String listingTimestmapString =
stateMap.get(LISTING_TIMESTAMP_KEY);
- if (listingTimestmapString != null) {
- latestTimestampListed =
Long.parseLong(listingTimestmapString);
- }
-
- getLogger().debug("Found new-style state stored, latesting
timestamp emitted = {}, latest listed = {}",
- new Object[] {latestTimestampEmitted,
latestTimestampListed});
- }
+ latestModifiedStatuses = new ArrayList<>();
+ StateMap stateMap = session.getState(Scope.CLUSTER);
+ String latestListedTimestampString =
stateMap.get(LATEST_TIMESTAMP_KEY);
+ String latestFiles = stateMap.get(LATEST_FILES_KEY);
+
+ final String legacyLatestListingTimestampString =
stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+ final String legacyLatestEmittedTimestampString =
stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+ if (legacyLatestListingTimestampString != null) {
+ final long legacyLatestListingTimestamp =
Long.parseLong(legacyLatestListingTimestampString);
+ final long legacyLatestEmittedTimestamp =
Long.parseLong(legacyLatestEmittedTimestampString);
+ latestModificationTime = legacyLatestListingTimestamp ==
legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 :
legacyLatestListingTimestamp;
+ } else if (latestListedTimestampString != null) {
+ latestModificationTime =
Long.parseLong(latestListedTimestampString);
+ latestModifiedStatuses = new
ArrayList<>(Arrays.asList(latestFiles.split("\\s")));
Review Comment:
Are we sure that file and directory names cannot contain spaces within the
path?
Space separated paths may not work, I'm afraid.
Just an idea but other processors add the paths in separate entries in the
state like `latest.file.1`, `latest.file.2`, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]