Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1552005815


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java:
##########
@@ -177,16 +177,20 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
                             flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                             
session.getProvenanceReporter().invokeRemoteProcess(flowFile, 
qualifiedPath.toString());
                         } catch (IOException ioe) {
-                            // One possible scenario is that the IOException 
is permissions based, however it would be impractical to check every possible
-                            // external HDFS authorization tool (Ranger, 
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
-                            getLogger().warn("Failed to delete file or 
directory", ioe);
-
-                            Map<String, String> attributes = 
Maps.newHashMapWithExpectedSize(1);
-                            // The error message is helpful in understanding 
at a flowfile level what caused the IOException (which ACL is denying the 
operation, e.g.)
-                            attributes.put(getAttributePrefix() + 
".error.message", ioe.getMessage());
-
-                            
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
-                            failedPath++;
+                            if (handleAuthErrors(ioe, session, context)) {
+                                return null;
+                            } else {
+                                // One possible scenario is that the 
IOException is permissions based, however it would be impractical to check 
every possible
+                                // external HDFS authorization tool (Ranger, 
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
+                                getLogger().warn("Failed to delete file or 
directory", ioe);
+
+                                Map<String, String> attributes = 
Maps.newHashMapWithExpectedSize(1);
+                                // The error message is helpful in 
understanding at a flowfile level what caused the IOException (which ACL is 
denying the operation, e.g.)
+                                attributes.put(getAttributePrefix() + 
".error.message", ioe.getMessage());
+
+                                
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
+                                failedPath++;
+                            }
                         }
                     }
                 }

Review Comment:
   Error handling of GSSException should be added to line 213 outer catch as 
well, because the fileSystem.exists is called outside of the inner catch block.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##########
@@ -294,7 +302,7 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
                 if (logEmptyListing.getAndDecrement() > 0) {
                     getLogger().info(
                             "Obtained file listing in {} milliseconds; listing 
had {} items, {} of which were new",
-                            new Object[]{millis, listedFiles.size(), 
newItems});
+                            millis, listedFiles.size(), newItems);

Review Comment:
   Could not comment on line 450: In processBachOfFiles line 450 the 
GSSException should be handled as well.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##########
@@ -254,8 +254,16 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
             if (!directoryExists) {
                 throw new IOException("Input Directory or File does not exist 
in HDFS");
             }
+        } catch (final IOException e) {

Review Comment:
   The IOException does not need to be handled differently, it can be handled 
in the Exception catch branch
   
   ```java 
   catch (Exception e) {
               if(!handleAuthErrors(e, session, context)) {
                   getLogger().error("Failed to retrieve content from {} for {} 
due to {}; routing to failure", filenameValue, flowFile, e);
                   flowFile = session.putAttribute(flowFile, 
"hdfs.failure.reason", e.getMessage());
                   flowFile = session.penalize(flowFile);
                   session.transfer(flowFile, REL_FAILURE);
               }
               return;
           }
   ```



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##########
@@ -372,63 +365,60 @@ public Object run() {
 
                     // Write FlowFile to temp file on HDFS
                     final StopWatch stopWatch = new StopWatch(true);
-                    session.read(putFlowFile, new InputStreamCallback() {
-
-                        @Override
-                        public void process(InputStream in) throws IOException 
{
-                            OutputStream fos = null;
-                            Path createdFile = null;
-                            try {
-                                if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-                                    fos = hdfs.append(copyFile, bufferSize);
-                                } else {
-                                    final EnumSet<CreateFlag> cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-                                    if (shouldIgnoreLocality(context, 
session)) {
-                                        
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-                                    }
+                    session.read(putFlowFile, in -> {
+                        OutputStream fos = null;
+                        Path createdFile = null;
+                        try {
+                            if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+                                fos = hdfs.append(copyFile, bufferSize);
+                            } else {
+                                final EnumSet<CreateFlag> cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-                                    fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-                                            
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-                                            null, null);
+                                if (shouldIgnoreLocality(context, session)) {
+                                    
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
                                 }
 
-                                if (codec != null) {
-                                    fos = codec.createOutputStream(fos);
+                                fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                                
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+                                        null, null);
+                            }
+
+                            if (codec != null) {
+                                fos = codec.createOutputStream(fos);
+                            }
+                            createdFile = actualCopyFile;
+                            BufferedInputStream bis = new 
BufferedInputStream(in);
+                            StreamUtils.copy(bis, fos);
+                            bis = null;
+                            fos.flush();
+                        } catch (IOException e) {
+                            throw new ProcessException(e);

Review Comment:
   This catch block can be eliminated. The StandardProcessSession::read catches 
IOExceptions and propagates them wrapped in a ProcessException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to