This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7306214 NIFI-9076: hdfs operations in MoveHDFS wrapped in UGI.doAs()
7306214 is described below
commit 73062146c6b3e2f388055b2613adf3e8e21d4825
Author: Peter Gyori <[email protected]>
AuthorDate: Tue Aug 24 17:18:20 2021 +0200
NIFI-9076: hdfs operations in MoveHDFS wrapped in UGI.doAs()
This closes #5334.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../apache/nifi/processors/hadoop/MoveHDFS.java | 23 ++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index acb4c85..b8a61e3 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -52,6 +52,7 @@ import org.apache.nifi.util.StopWatch;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -247,7 +248,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
Path inputPath;
try {
inputPath = getNormalizedPath(context, INPUT_DIRECTORY_OR_FILE,
flowFile);
- if (!hdfs.exists(inputPath)) {
+ final boolean directoryExists =
getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () ->
hdfs.exists(inputPath));
+ if (!directoryExists) {
throw new IOException("Input Directory or File does not exist
in HDFS");
}
} catch (Exception e) {
@@ -297,6 +299,11 @@ public class MoveHDFS extends AbstractHadoopProcessor {
context.yield();
getLogger().warn("Error while retrieving list of files due to {}",
new Object[]{e});
return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ context.yield();
+ getLogger().warn("Interrupted while retrieving files", e);
+ return;
}
// prepare to process a batch of files in the queue
@@ -434,7 +441,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
}
}
- protected Set<Path> performListing(final ProcessContext context, Path
path) throws IOException {
+ protected Set<Path> performListing(final ProcessContext context, Path
path) throws IOException, InterruptedException {
Set<Path> listing = null;
if (listingLock.tryLock()) {
@@ -464,21 +471,25 @@ public class MoveHDFS extends AbstractHadoopProcessor {
}
protected Set<Path> selectFiles(final FileSystem hdfs, final Path
inputPath, Set<Path> filesVisited)
- throws IOException {
+ throws IOException, InterruptedException {
if (null == filesVisited) {
filesVisited = new HashSet<>();
}
- if (!hdfs.exists(inputPath)) {
+ UserGroupInformation ugi = getUserGroupInformation();
+
+ final boolean directoryExists =
ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(inputPath));
+ if (!directoryExists) {
throw new IOException("Selection directory " +
inputPath.toString() + " doesn't appear to exist!");
}
final Set<Path> files = new HashSet<>();
- FileStatus inputStatus = hdfs.getFileStatus(inputPath);
+ FileStatus inputStatus =
ugi.doAs((PrivilegedExceptionAction<FileStatus>) () ->
hdfs.getFileStatus(inputPath));
if (inputStatus.isDirectory()) {
- for (final FileStatus file : hdfs.listStatus(inputPath)) {
+ FileStatus[] fileStatuses =
ugi.doAs((PrivilegedExceptionAction<FileStatus[]>) () ->
hdfs.listStatus(inputPath));
+ for (final FileStatus file : fileStatuses) {
final Path canonicalFile = file.getPath();
if (!filesVisited.add(canonicalFile)) { // skip files we've
already seen (may be looping directory links)