This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6a8a8caa4c NIFI-11902: Fix ListHDFS closes FileSystem in first run
6a8a8caa4c is described below
commit 6a8a8caa4c202ff50bf0f51edce749eb915721f7
Author: Lehel Boer <[email protected]>
AuthorDate: Thu Aug 3 00:07:38 2023 +0200
NIFI-11902: Fix ListHDFS closes FileSystem in first run
This closes #7565.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../apache/nifi/processors/hadoop/ListHDFS.java | 98 +++++++++++-----------
1 file changed, 48 insertions(+), 50 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index e706ab65f0..4bba8572ca 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -257,58 +257,56 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
// Pull in any file that is newer than the timestamp that we have.
- try (final FileSystem hdfs = getFileSystem()) {
- final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
- final PathFilter pathFilter = createPathFilter(context);
- final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
- final FileStatusManager fileStatusManager = new
FileStatusManager(latestTimestamp, latestFiles);
- final Path rootPath = getNormalizedPath(context, DIRECTORY);
- final FileStatusIterable fileStatusIterable = new
FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
-
- final Long minAgeProp =
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE :
minAgeProp;
- final Long maxAgeProp =
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
-
- final HadoopFileStatusWriter writer =
HadoopFileStatusWriter.builder()
- .session(session)
- .successRelationship(getSuccessRelationship())
- .fileStatusIterable(fileStatusIterable)
- .fileStatusManager(fileStatusManager)
- .pathFilter(pathFilter)
- .minimumAge(minimumAge)
- .maximumAge(maximumAge)
- .previousLatestTimestamp(latestTimestamp)
- .previousLatestFiles(latestFiles)
- .writerFactory(writerFactory)
- .hdfsPrefix(getAttributePrefix())
- .logger(getLogger())
- .build();
-
- writer.write();
-
- getLogger().debug("Found a total of {} files in HDFS, {} are
listed", fileStatusIterable.getTotalFileCount(), writer.getListedFileCount());
-
- if (writer.getListedFileCount() > 0) {
- final Map<String, String> updatedState = new HashMap<>();
- updatedState.put(LATEST_TIMESTAMP_KEY,
String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
- final List<String> files =
fileStatusManager.getCurrentLatestFiles();
- for (int i = 0; i < files.size(); i++) {
- final String currentFilePath = files.get(i);
- updatedState.put(String.format(LATEST_FILES_KEY, i),
currentFilePath);
- }
- getLogger().debug("New state map: {}", updatedState);
- updateState(session, updatedState);
-
- getLogger().info("Successfully created listing with {} new
files from HDFS", writer.getListedFileCount());
- } else {
- getLogger().debug("There is no data to list. Yielding.");
- context.yield();
+ final FileSystem hdfs = getFileSystem();
+ final boolean recursive =
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+ final PathFilter pathFilter = createPathFilter(context);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ final FileStatusManager fileStatusManager = new
FileStatusManager(latestTimestamp, latestFiles);
+ final Path rootPath = getNormalizedPath(context, DIRECTORY);
+ final FileStatusIterable fileStatusIterable = new
FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
+
+ final Long minAgeProp =
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE :
minAgeProp;
+ final Long maxAgeProp =
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE :
maxAgeProp;
+
+ final HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder()
+ .session(session)
+ .successRelationship(getSuccessRelationship())
+ .fileStatusIterable(fileStatusIterable)
+ .fileStatusManager(fileStatusManager)
+ .pathFilter(pathFilter)
+ .minimumAge(minimumAge)
+ .maximumAge(maximumAge)
+ .previousLatestTimestamp(latestTimestamp)
+ .previousLatestFiles(latestFiles)
+ .writerFactory(writerFactory)
+ .hdfsPrefix(getAttributePrefix())
+ .logger(getLogger())
+ .build();
+
+ writer.write();
+
+ getLogger().debug("Found a total of {} files in HDFS, {} are listed",
fileStatusIterable.getTotalFileCount(), writer.getListedFileCount());
+
+ if (writer.getListedFileCount() > 0) {
+ final Map<String, String> updatedState = new HashMap<>();
+ updatedState.put(LATEST_TIMESTAMP_KEY,
String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
+ final List<String> files =
fileStatusManager.getCurrentLatestFiles();
+ for (int i = 0; i < files.size(); i++) {
+ final String currentFilePath = files.get(i);
+ updatedState.put(String.format(LATEST_FILES_KEY, i),
currentFilePath);
}
- } catch (IOException e) {
- throw new ProcessException("IO error occurred when closing HDFS
file system", e);
+ getLogger().debug("New state map: {}", updatedState);
+ updateState(session, updatedState);
+
+ getLogger().info("Successfully created listing with {} new files
from HDFS", writer.getListedFileCount());
+ } else {
+ getLogger().debug("There is no data to list. Yielding.");
+ context.yield();
}
+
}
private PathFilter createPathFilter(final ProcessContext context) {