Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1552005815
########## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java: ########## @@ -177,16 +177,20 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); } catch (IOException ioe) { - // One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible - // external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. - getLogger().warn("Failed to delete file or directory", ioe); - - Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1); - // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) - attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); - - session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); - failedPath++; + if (handleAuthErrors(ioe, session, context)) { + return null; + } else { + // One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible + // external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. + getLogger().warn("Failed to delete file or directory", ioe); + + Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1); + // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) + attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); + + session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); + failedPath++; + } } } } Review Comment: Error handling of GSSException should be added to line 213 outer catch as well, because the fileSystem.exists is called outside of the inner catch block. ########## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ########## @@ -294,7 +302,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (logEmptyListing.getAndDecrement() > 0) { getLogger().info( "Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", - new Object[]{millis, listedFiles.size(), newItems}); + millis, listedFiles.size(), newItems); Review Comment: Could not comment on line 450: In processBachOfFiles line 450 the GSSException should be handled as well. ########## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ########## @@ -254,8 +254,16 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (!directoryExists) { throw new IOException("Input Directory or File does not exist in HDFS"); } + } catch (final IOException e) { Review Comment: The IOException does not need to be handled differently, it can be handled in the Exception catch branch ```java catch (Exception e) { if(!handleAuthErrors(e, session, context)) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e); flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } return; } ``` ########## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ########## @@ -372,63 +365,60 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); - session.read(putFlowFile, new InputStreamCallback() { - - @Override - public void process(InputStream in) throws IOException { - OutputStream fos = null; - Path createdFile = null; - try { - if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { - fos = hdfs.append(copyFile, bufferSize); - } else { - final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - - if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); - } + session.read(putFlowFile, in -> { + OutputStream fos = null; + Path createdFile = null; + try { + if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { + fos = hdfs.append(copyFile, bufferSize); + } else { + final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, - null, null); + if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } - if (codec != null) { - fos = codec.createOutputStream(fos); + fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, + null, null); + } + + if (codec != null) { + fos = codec.createOutputStream(fos); + } + createdFile = actualCopyFile; + BufferedInputStream bis = new BufferedInputStream(in); + StreamUtils.copy(bis, fos); + bis = null; + fos.flush(); + } catch (IOException e) { + throw new ProcessException(e); Review Comment: This catch block can be eliminated. The StandardProcessSession::read catches IOExceptions and propagates them wrapped in a ProcessException. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org