tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253194313
##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult>
customValidate(ValidationContext context)
problems.add(new
ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
.explanation(MIN_AGE.getDisplayName() + " cannot be
greater than " + MAX_AGE.getDisplayName()).build());
}
-
return problems;
}
- protected String getKey(final String directory) {
- return getIdentifier() + ".lastListingTime." + directory;
- }
-
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) ||
descriptor.equals(FILE_FILTER))) {
- this.resetState = true;
+ resetState = true;
}
}
- /**
- * Determines which of the given FileStatus's describes a File that should
be listed.
- *
- * @param statuses the eligible FileStatus objects that we could
potentially list
- * @param context processor context with properties values
- * @return a Set containing only those FileStatus objects that we want to
list
- */
- Set<FileStatus> determineListable(final Set<FileStatus> statuses,
ProcessContext context) {
- final long minTimestamp = this.latestTimestampListed;
- final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
- final Long minAgeProp =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- // NIFI-4144 - setting to MIN_VALUE so that in case the file
modification time is in
- // the future relative to the nifi instance, files are not skipped.
- final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE :
minAgeProp;
- final Long maxAgeProp =
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
-
- // Build a sorted map to determine the latest possible entries
- for (final FileStatus status : statuses) {
- if (status.getPath().getName().endsWith("_COPYING_")) {
- continue;
- }
-
- final long fileAge = System.currentTimeMillis() -
status.getModificationTime();
- if (minimumAge > fileAge || fileAge > maximumAge) {
- continue;
- }
-
- final long entityTimestamp = status.getModificationTime();
-
- if (entityTimestamp > latestTimestampListed) {
- latestTimestampListed = entityTimestamp;
- }
-
- // New entries are all those that occur at or after the associated
timestamp
- final boolean newEntry = entityTimestamp >= minTimestamp &&
entityTimestamp > latestTimestampEmitted;
-
- if (newEntry) {
- List<FileStatus> entitiesForTimestamp =
orderedEntries.get(status.getModificationTime());
- if (entitiesForTimestamp == null) {
- entitiesForTimestamp = new ArrayList<FileStatus>();
- orderedEntries.put(status.getModificationTime(),
entitiesForTimestamp);
- }
- entitiesForTimestamp.add(status);
- }
- }
-
- final Set<FileStatus> toList = new HashSet<>();
-
- if (orderedEntries.size() > 0) {
- long latestListingTimestamp = orderedEntries.lastKey();
-
- // If the last listing time is equal to the newest entries
previously seen,
- // another iteration has occurred without new files and special
handling is needed to avoid starvation
- if (latestListingTimestamp == minTimestamp) {
- // We are done if the latest listing timestamp is equal to the
last processed time,
- // meaning we handled those items originally passed over
- if (latestListingTimestamp == latestTimestampEmitted) {
- return Collections.emptySet();
- }
- } else {
- // Otherwise, newest entries are held back one cycle to avoid
issues in writes occurring exactly when the listing is being performed to avoid
missing data
- orderedEntries.remove(latestListingTimestamp);
- }
-
- for (List<FileStatus> timestampEntities : orderedEntries.values())
{
- for (FileStatus status : timestampEntities) {
- toList.add(status);
- }
- }
- }
-
- return toList;
- }
-
@OnScheduled
public void resetStateIfNecessary(final ProcessContext context) throws
IOException {
if (resetState) {
- getLogger().debug("Property has been modified. Resetting the state
values - listing.timestamp and emitted.timestamp to -1L");
+ getLogger().debug("Property has been modified. Resetting the state
values.");
context.getStateManager().clear(Scope.CLUSTER);
- this.resetState = false;
+ resetState = false;
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- // We have to ensure that we don't continually perform listings,
because if we perform two listings within
- // the same millisecond, our algorithm for comparing timestamps will
not work. So we ensure here that we do
- // not let that happen.
- final long now = System.nanoTime();
- if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
- lastRunTimestamp = now;
- context.yield();
- return;
- }
- lastRunTimestamp = now;
-
// Ensure that we are using the latest listing information before we
try to perform a listing of HDFS files.
try {
- final StateMap stateMap = session.getState(Scope.CLUSTER);
- if (!stateMap.getStateVersion().isPresent()) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
- getLogger().debug("Found no state stored");
- } else {
- // Determine if state is stored in the 'new' format or the
'old' format
- final String emittedString =
stateMap.get(EMITTED_TIMESTAMP_KEY);
- if (emittedString == null) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
- getLogger().debug("Found no recognized state keys;
assuming no relevant state and resetting listing/emitted time to -1");
- } else {
- // state is stored in the new format, using just two
timestamps
- latestTimestampEmitted = Long.parseLong(emittedString);
- final String listingTimestmapString =
stateMap.get(LISTING_TIMESTAMP_KEY);
- if (listingTimestmapString != null) {
- latestTimestampListed =
Long.parseLong(listingTimestmapString);
- }
-
- getLogger().debug("Found new-style state stored, latesting
timestamp emitted = {}, latest listed = {}",
- new Object[] {latestTimestampEmitted,
latestTimestampListed});
- }
+ latestTimestamp = 0L;
+ latestFiles = new ArrayList<>();
+ StateMap stateMap = session.getState(Scope.CLUSTER);
+ String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+
+ final String legacyLatestListingTimestampString =
stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+ final String legacyLatestEmittedTimestampString =
stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+ if (legacyLatestListingTimestampString != null) {
+ final long legacyLatestListingTimestamp =
Long.parseLong(legacyLatestListingTimestampString);
+ final long legacyLatestEmittedTimestamp =
Long.parseLong(legacyLatestEmittedTimestampString);
+ latestTimestamp = legacyLatestListingTimestamp ==
legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 :
legacyLatestListingTimestamp;
+ getLogger().debug("Transitioned from legacy state to new
state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp':
{}'," +
+ "'latestTimestamp': {}", legacyLatestListingTimestamp,
legacyLatestEmittedTimestamp, latestTimestamp);
+ } else if (latestTimestampString != null) {
+ latestTimestamp = Long.parseLong(latestTimestampString);
+ this.latestFiles = stateMap.toMap().entrySet().stream()
+ .filter(entry ->
entry.getKey().startsWith("latest.file"))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
}
- } catch (final IOException ioe) {
+ } catch (IOException e) {
getLogger().error("Failed to retrieve timestamp of last listing
from the State Manager. Will not perform listing until this is accomplished.");
context.yield();
return;
}
// Pull in any file that is newer than the timestamp that we have.
- final FileSystem hdfs = getFileSystem();
- final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
- String fileFilterMode =
context.getProperty(FILE_FILTER_MODE).getValue();
+ try (final FileSystem hdfs = getFileSystem()) {
+ final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+ final PathFilter pathFilter = createPathFilter(context);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final Set<FileStatus> statuses;
- try {
+ final FileStatusManager fileStatusManager = new
FileStatusManager();
Review Comment:
I agree with @turcsanyip except the last part. Conceptually I would advise
against initializing a final field from a non-final running field, even if it's
hasn't changed at that point yet. I would keep those fields in the the object
writer constructor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]