This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7306214  NIFI-9076: hdfs operations in MoveHDFS wrapped in UGI.doAs()
7306214 is described below

commit 73062146c6b3e2f388055b2613adf3e8e21d4825
Author: Peter Gyori <[email protected]>
AuthorDate: Tue Aug 24 17:18:20 2021 +0200

    NIFI-9076: hdfs operations in MoveHDFS wrapped in UGI.doAs()
    
    This closes #5334.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../apache/nifi/processors/hadoop/MoveHDFS.java    | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index acb4c85..b8a61e3 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -52,6 +52,7 @@ import org.apache.nifi.util.StopWatch;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -247,7 +248,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
         Path inputPath;
         try {
             inputPath = getNormalizedPath(context, INPUT_DIRECTORY_OR_FILE, 
flowFile);
-            if (!hdfs.exists(inputPath)) {
+            final boolean directoryExists = 
getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> 
hdfs.exists(inputPath));
+            if (!directoryExists) {
                 throw new IOException("Input Directory or File does not exist 
in HDFS");
             }
         } catch (Exception e) {
@@ -297,6 +299,11 @@ public class MoveHDFS extends AbstractHadoopProcessor {
             context.yield();
             getLogger().warn("Error while retrieving list of files due to {}", 
new Object[]{e});
             return;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            context.yield();
+            getLogger().warn("Interrupted while retrieving files", e);
+            return;
         }
 
         // prepare to process a batch of files in the queue
@@ -434,7 +441,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
         }
     }
 
-    protected Set<Path> performListing(final ProcessContext context, Path 
path) throws IOException {
+    protected Set<Path> performListing(final ProcessContext context, Path 
path) throws IOException, InterruptedException {
         Set<Path> listing = null;
 
         if (listingLock.tryLock()) {
@@ -464,21 +471,25 @@ public class MoveHDFS extends AbstractHadoopProcessor {
     }
 
     protected Set<Path> selectFiles(final FileSystem hdfs, final Path 
inputPath, Set<Path> filesVisited)
-            throws IOException {
+            throws IOException, InterruptedException {
         if (null == filesVisited) {
             filesVisited = new HashSet<>();
         }
 
-        if (!hdfs.exists(inputPath)) {
+        UserGroupInformation ugi = getUserGroupInformation();
+
+        final boolean directoryExists = 
ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(inputPath));
+        if (!directoryExists) {
             throw new IOException("Selection directory " + 
inputPath.toString() + " doesn't appear to exist!");
         }
 
         final Set<Path> files = new HashSet<>();
 
-        FileStatus inputStatus = hdfs.getFileStatus(inputPath);
+        FileStatus inputStatus = 
ugi.doAs((PrivilegedExceptionAction<FileStatus>) () -> 
hdfs.getFileStatus(inputPath));
 
         if (inputStatus.isDirectory()) {
-            for (final FileStatus file : hdfs.listStatus(inputPath)) {
+            FileStatus[] fileStatuses = 
ugi.doAs((PrivilegedExceptionAction<FileStatus[]>) () -> 
hdfs.listStatus(inputPath));
+            for (final FileStatus file : fileStatuses) {
                 final Path canonicalFile = file.getPath();
 
                 if (!filesVisited.add(canonicalFile)) { // skip files we've 
already seen (may be looping directory links)

Reply via email to