tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1199095603
##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -452,243 +305,201 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
// Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = getFileSystem();
final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
- String fileFilterMode =
context.getProperty(FILE_FILTER_MODE).getValue();
+ final PathFilter pathFilter = createPathFilter(context);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final Set<FileStatus> statuses;
+ final HdfsObjectWriter writer = getHdfsObjectWriter(session,
writerFactory);
+
+ long listedFileCount = 0;
try {
final Path rootPath = getNormalizedPath(context, DIRECTORY);
- statuses = getStatuses(rootPath, recursive, hdfs,
createPathFilter(context), fileFilterMode);
- getLogger().debug("Found a total of {} files in HDFS", new
Object[] {statuses.size()});
- } catch (final IOException | IllegalArgumentException e) {
- getLogger().error("Failed to perform listing of HDFS", e);
- return;
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- getLogger().error("Interrupted while performing listing of HDFS",
e);
- return;
- }
-
- final Set<FileStatus> listable = determineListable(statuses, context);
- getLogger().debug("Of the {} files found in HDFS, {} are listable",
new Object[] {statuses.size(), listable.size()});
-
- // Create FlowFile(s) for the listing, if there are any
- if (!listable.isEmpty()) {
- if (context.getProperty(RECORD_WRITER).isSet()) {
- try {
- createRecords(listable, context, session);
- } catch (final IOException | SchemaNotFoundException e) {
- getLogger().error("Failed to write listing of HDFS", e);
- return;
+ final FileCountRemoteIterator<FileStatus> fileStatusIterator =
getFileStatusIterator(rootPath, recursive, hdfs, pathFilter);
+
+ final Long minAgeProp =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE :
minAgeProp;
+ final Long maxAgeProp =
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
+
+ writer.beginListing();
+
+ FileStatus status;
+ while (fileStatusIterator.hasNext()) {
+ status = fileStatusIterator.next();
+ if (status != null && determineListable(status, minimumAge,
maximumAge, isTransitioningFromLegacyState, isLegacyLastStatusListed)) {
+ writer.addToListing(status);
+ fileStatusManager.update(status);
+ listedFileCount++;
}
- } else {
- createFlowFiles(listable, session);
}
- }
+ writer.finishListing();
- for (final FileStatus status : listable) {
- final long fileModTime = status.getModificationTime();
- if (fileModTime > latestTimestampEmitted) {
- latestTimestampEmitted = fileModTime;
- }
+ long totalFileCount = fileStatusIterator.getFileCount();
+ getLogger().debug("Found a total of {} files in HDFS, {} are
listed", totalFileCount, listedFileCount);
+ } catch (final IOException | IllegalArgumentException |
SchemaNotFoundException e) {
+ getLogger().error("Failed to perform listing of HDFS", e);
+ writer.finishListingExceptionally(e);
+ return;
}
- final Map<String, String> updatedState = new HashMap<>(1);
- updatedState.put(LISTING_TIMESTAMP_KEY,
String.valueOf(latestTimestampListed));
- updatedState.put(EMITTED_TIMESTAMP_KEY,
String.valueOf(latestTimestampEmitted));
- getLogger().debug("New state map: {}", new Object[] {updatedState});
+ if (listedFileCount > 0) {
+ fileStatusManager.finishIteration();
+ final Map<String, String> updatedState = new HashMap<>(1);
+ updatedState.put(LISTING_TIMESTAMP_KEY,
String.valueOf(fileStatusManager.getLastModificationTime()));
+ getLogger().debug("New state map: {}", updatedState);
+ updateState(session, updatedState);
- try {
- session.setState(updatedState, Scope.CLUSTER);
- } catch (final IOException ioe) {
- getLogger().warn("Failed to save cluster-wide state. If NiFi is
restarted, data duplication may occur", ioe);
- }
-
- final int listCount = listable.size();
- if ( listCount > 0 ) {
- getLogger().info("Successfully created listing with {} new files
from HDFS", new Object[] {listCount});
+ getLogger().info("Successfully created listing with {} new files
from HDFS", listedFileCount);
session.commitAsync();
} else {
getLogger().debug("There is no data to list. Yielding.");
context.yield();
}
}
- private void createFlowFiles(final Set<FileStatus> fileStatuses, final
ProcessSession session) {
- for (final FileStatus status : fileStatuses) {
- final Map<String, String> attributes = createAttributes(status);
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, getSuccessRelationship());
+ private HdfsObjectWriter getHdfsObjectWriter(final ProcessSession session,
final RecordSetWriterFactory writerFactory) {
+ final HdfsObjectWriter writer;
+ if (writerFactory == null) {
+ writer = new FlowFileObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger());
}
+ return writer;
}
- private void createRecords(final Set<FileStatus> fileStatuses, final
ProcessContext context, final ProcessSession session) throws IOException,
SchemaNotFoundException {
- final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
- FlowFile flowFile = session.create();
- final WriteResult writeResult;
- try (final OutputStream out = session.write(flowFile);
- final RecordSetWriter recordSetWriter =
writerFactory.createWriter(getLogger(), getRecordSchema(), out,
Collections.emptyMap())) {
+ private boolean notEnoughTimeElapsedToRun(final ProcessContext context) {
+ final long now = System.nanoTime();
+ if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
+ context.yield();
+ return true;
+ }
+ lastRunTimestamp = now;
+ return false;
+ }
- recordSetWriter.beginRecordSet();
- for (final FileStatus fileStatus : fileStatuses) {
- final Record record = createRecord(fileStatus);
- recordSetWriter.write(record);
+ private boolean determineListable(final FileStatus status, final long
minimumAge, final long maximumAge, final boolean
isTransitioningFromLegacyState, final boolean isLegacyLastStatusListed) {
+ // If the file was created during the processor's last iteration we
have to check if it was already listed
+ // If legacy state was used and the file was already listed once, we
don't want to list it once again.
+ if (status.getModificationTime() ==
fileStatusManager.getLastModificationTime()) {
+ if (isTransitioningFromLegacyState) {
+ return !isLegacyLastStatusListed;
}
-
- writeResult = recordSetWriter.finishRecordSet();
+ return
!fileStatusManager.getLastModifiedStatuses().contains(status);
}
- final Map<String, String> attributes = new
HashMap<>(writeResult.getAttributes());
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- flowFile = session.putAllAttributes(flowFile, attributes);
+ final long fileAge = System.currentTimeMillis() -
status.getModificationTime();
+ if (minimumAge > fileAge || fileAge > maximumAge) {
+ return false;
+ }
- session.transfer(flowFile, getSuccessRelationship());
+ return status.getModificationTime() >
fileStatusManager.getLastModificationTime();
}
- private Record createRecord(final FileStatus fileStatus) {
- final Map<String, Object> values = new HashMap<>();
- values.put(FILENAME, fileStatus.getPath().getName());
- values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
- values.put(OWNER, fileStatus.getOwner());
- values.put(GROUP, fileStatus.getGroup());
- values.put(LAST_MODIFIED, new
Timestamp(fileStatus.getModificationTime()));
- values.put(SIZE, fileStatus.getLen());
- values.put(REPLICATION, fileStatus.getReplication());
-
- final FsPermission permission = fileStatus.getPermission();
- final String perms = getPerms(permission.getUserAction()) +
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
- values.put(PERMISSIONS, perms);
-
- values.put(IS_DIRECTORY, fileStatus.isDirectory());
- values.put(IS_SYM_LINK, fileStatus.isSymlink());
- values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
- values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
- return new MapRecord(getRecordSchema(), values);
- }
+ private FileCountRemoteIterator<FileStatus> getFileStatusIterator(final
Path path, final boolean recursive, final FileSystem hdfs, final PathFilter
filter) {
+ final Deque<Path> pathStack = new ArrayDeque<>();
+ pathStack.push(path);
- private RecordSchema getRecordSchema() {
- return RECORD_SCHEMA;
- }
+ return new FileCountRemoteIterator<>() {
Review Comment:
This iterator could be simplified if the stack was handled within and the
filtering is not done here.
(`ListHDFS` already does some of in the `determineListable` so all the
filtering could be combined in one place.)
Also probably better for it to implement `Iterable` instead of
`RemoteIterator`.
##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -452,243 +305,201 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
// Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = getFileSystem();
final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
- String fileFilterMode =
context.getProperty(FILE_FILTER_MODE).getValue();
+ final PathFilter pathFilter = createPathFilter(context);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final Set<FileStatus> statuses;
+ final HdfsObjectWriter writer = getHdfsObjectWriter(session,
writerFactory);
+
+ long listedFileCount = 0;
try {
final Path rootPath = getNormalizedPath(context, DIRECTORY);
- statuses = getStatuses(rootPath, recursive, hdfs,
createPathFilter(context), fileFilterMode);
- getLogger().debug("Found a total of {} files in HDFS", new
Object[] {statuses.size()});
- } catch (final IOException | IllegalArgumentException e) {
- getLogger().error("Failed to perform listing of HDFS", e);
- return;
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- getLogger().error("Interrupted while performing listing of HDFS",
e);
- return;
- }
-
- final Set<FileStatus> listable = determineListable(statuses, context);
- getLogger().debug("Of the {} files found in HDFS, {} are listable",
new Object[] {statuses.size(), listable.size()});
-
- // Create FlowFile(s) for the listing, if there are any
- if (!listable.isEmpty()) {
- if (context.getProperty(RECORD_WRITER).isSet()) {
- try {
- createRecords(listable, context, session);
- } catch (final IOException | SchemaNotFoundException e) {
- getLogger().error("Failed to write listing of HDFS", e);
- return;
+ final FileCountRemoteIterator<FileStatus> fileStatusIterator =
getFileStatusIterator(rootPath, recursive, hdfs, pathFilter);
+
+ final Long minAgeProp =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE :
minAgeProp;
+ final Long maxAgeProp =
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
+
+ writer.beginListing();
+
+ FileStatus status;
+ while (fileStatusIterator.hasNext()) {
+ status = fileStatusIterator.next();
+ if (status != null && determineListable(status, minimumAge,
maximumAge, isTransitioningFromLegacyState, isLegacyLastStatusListed)) {
+ writer.addToListing(status);
+ fileStatusManager.update(status);
+ listedFileCount++;
}
- } else {
- createFlowFiles(listable, session);
}
- }
+ writer.finishListing();
- for (final FileStatus status : listable) {
- final long fileModTime = status.getModificationTime();
- if (fileModTime > latestTimestampEmitted) {
- latestTimestampEmitted = fileModTime;
- }
+ long totalFileCount = fileStatusIterator.getFileCount();
+ getLogger().debug("Found a total of {} files in HDFS, {} are
listed", totalFileCount, listedFileCount);
+ } catch (final IOException | IllegalArgumentException |
SchemaNotFoundException e) {
+ getLogger().error("Failed to perform listing of HDFS", e);
+ writer.finishListingExceptionally(e);
+ return;
Review Comment:
This relates to my previous comment about `HdfsObjectWriter` being too
granular.
When we have a `FlowFileObjectWriter` we don't remove the created flowfiles
from the session and neither do we rollback the session. So the flowfiles will
be emitted.
However the status won't be saved so we will emit the flowfiles for the same
listed files again when the `onTrigger` runs next time. This leads to
duplication of flowfiles.
Consolidating the `HdfsObjectWriter` interface will probably solve this
problem as well.
##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HdfsObjectWriter.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+
+import java.io.IOException;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public interface HdfsObjectWriter {
Review Comment:
This interface has many methods that are only implemented by one of its two
implementing classes. That is usually a sign that the separation of concerns is
not adequate.
In this case a single `write` method or something similar would be probably
better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]