This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new b04b9b2213 NIFI-12889 - Retry Kerberos login on auth failure in HDFS
processors
b04b9b2213 is described below
commit b04b9b2213f902d8fd912ab3ba950875deb7e9e4
Author: Matt Burgess <[email protected]>
AuthorDate: Tue Apr 9 13:26:43 2024 -0400
NIFI-12889 - Retry Kerberos login on auth failure in HDFS processors
Signed-off-by: Pierre Villard <[email protected]>
This closes #8618.
---
.../processors/hadoop/AbstractHadoopProcessor.java | 46 +++-
.../apache/nifi/processors/hadoop/DeleteHDFS.java | 33 ++-
.../apache/nifi/processors/hadoop/FetchHDFS.java | 100 ++++----
.../org/apache/nifi/processors/hadoop/GetHDFS.java | 21 +-
.../nifi/processors/hadoop/GetHDFSFileInfo.java | 16 +-
.../processors/hadoop/GetHDFSSequenceFile.java | 19 +-
.../apache/nifi/processors/hadoop/MoveHDFS.java | 180 +++++++-------
.../org/apache/nifi/processors/hadoop/PutHDFS.java | 126 ++++------
.../GSSExceptionRollbackYieldSessionHandler.java | 30 +++
.../processors/hadoop/GetHDFSSequenceFileTest.java | 57 ++++-
.../apache/nifi/processors/hadoop/GetHDFSTest.java | 39 ++-
.../nifi/processors/hadoop/MoveHDFSTest.java | 62 ++++-
.../apache/nifi/processors/hadoop/PutHDFSTest.java | 191 +++-----------
.../nifi/processors/hadoop/TestDeleteHDFS.java | 20 +-
.../nifi/processors/hadoop/TestFetchHDFS.java | 62 +++--
.../processors/hadoop/TestGetHDFSFileInfo.java | 184 +++-----------
.../processors/hadoop/util/MockFileSystem.java | 273 +++++++++++++++++++++
17 files changed, 854 insertions(+), 605 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 960e7a5b7c..12b9bbc68c 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.hadoop;
+import com.google.common.base.Throwables;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -42,11 +43,13 @@ import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
+import org.ietf.jgss.GSSException;
import javax.net.SocketFactory;
import java.io.File;
@@ -62,7 +65,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
/**
* This is a base class that is helpful when building processors interacting
with HDFS.
@@ -171,7 +177,7 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor implemen
// variables shared by all threads of this processor
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
- private final AtomicReference<HdfsResources> hdfsResources = new
AtomicReference<>();
+ final AtomicReference<HdfsResources> hdfsResources = new
AtomicReference<>();
// Holder of cached Configuration information so validation does not
reload the same config over and over
private final AtomicReference<ValidationResources>
validationResourceHolder = new AtomicReference<>();
@@ -532,12 +538,7 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor implemen
protected FileSystem getFileSystemAsUser(final Configuration config,
UserGroupInformation ugi) throws IOException {
try {
- return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override
- public FileSystem run() throws Exception {
- return FileSystem.get(config);
- }
- });
+ return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () ->
FileSystem.get(config));
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " +
e.getMessage());
}
@@ -703,4 +704,35 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor implemen
return new Path(path.replaceAll("/+", "/"));
}
+
+ /**
+ * Returns an optional with the first throwable in the causal chain that
is assignable to the provided cause type,
+ * and satisfies the provided cause predicate, {@link Optional#empty()}
otherwise.
+ * @param t The throwable to inspect for the cause.
+ * @return Throwable Cause
+ */
+ protected <T extends Throwable> Optional<T> findCause(Throwable t,
Class<T> expectedCauseType, Predicate<T> causePredicate) {
+ Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
+ return causalChain
+ .filter(expectedCauseType::isInstance)
+ .map(expectedCauseType::cast)
+ .filter(causePredicate)
+ .findFirst();
+ }
+
+ protected boolean handleAuthErrors(Throwable t, ProcessSession session,
ProcessContext context, BiConsumer<ProcessSession, ProcessContext>
sessionHandler) {
+ Optional<GSSException> causeOptional = findCause(t,
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+ if (causeOptional.isPresent()) {
+
+ getLogger().error("An error occurred while connecting to HDFS.
Rolling back session and, and resetting HDFS resources", causeOptional.get());
+ try {
+
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
+ } catch (IOException ioe) {
+ getLogger().error("An error occurred resetting HDFS resources,
you may need to restart the processor.");
+ }
+ sessionHandler.accept(session, context);
+ return true;
+ }
+ return false;
+ }
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 0cab0b0bdc..a22991eed8 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -39,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import java.io.IOException;
import java.security.PrivilegedAction;
@@ -177,16 +178,20 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
flowFile = session.putAttribute(flowFile,
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().invokeRemoteProcess(flowFile,
qualifiedPath.toString());
} catch (IOException ioe) {
- // One possible scenario is that the IOException
is permissions based, however it would be impractical to check every possible
- // external HDFS authorization tool (Ranger,
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
- getLogger().warn("Failed to delete file or
directory", ioe);
-
- Map<String, String> attributes =
Maps.newHashMapWithExpectedSize(1);
- // The error message is helpful in understanding
at a flowfile level what caused the IOException (which ACL is denying the
operation, e.g.)
- attributes.put(getAttributePrefix() +
".error.message", ioe.getMessage());
-
-
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes),
getFailureRelationship());
- failedPath++;
+ if (handleAuthErrors(ioe, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
+ return null;
+ } else {
+ // One possible scenario is that the
IOException is permissions based, however it would be impractical to check
every possible
+ // external HDFS authorization tool (Ranger,
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
+ getLogger().warn("Failed to delete file or
directory", ioe);
+
+ Map<String, String> attributes =
Maps.newHashMapWithExpectedSize(1);
+ // The error message is helpful in
understanding at a flowfile level what caused the IOException (which ACL is
denying the operation, e.g.)
+ attributes.put(getAttributePrefix() +
".error.message", ioe.getMessage());
+
+
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes),
getFailureRelationship());
+ failedPath++;
+ }
}
}
}
@@ -198,8 +203,12 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
session.remove(flowFile);
}
} catch (IOException e) {
- getLogger().error("Error processing delete for flowfile {} due
to {}", flowFile, e.getMessage(), e);
- session.transfer(flowFile, getFailureRelationship());
+ if (handleAuthErrors(e, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
+ return null;
+ } else {
+ getLogger().error("Error processing delete for flowfile {}
due to {}", flowFile, e.getMessage(), e);
+ session.transfer(flowFile, getFailureRelationship());
+ }
}
return null;
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 2a4986cadd..e026c01862 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;
import java.io.FileNotFoundException;
@@ -141,60 +142,59 @@ public class FetchHDFS extends AbstractHadoopProcessor {
final StopWatch stopWatch = new StopWatch(true);
final FlowFile finalFlowFile = flowFile;
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- InputStream stream = null;
- CompressionCodec codec = null;
- Configuration conf = getConfiguration();
- final CompressionCodecFactory compressionCodecFactory = new
CompressionCodecFactory(conf);
- final CompressionType compressionType =
getCompressionType(context);
- final boolean inferCompressionCodec = compressionType ==
CompressionType.AUTOMATIC;
-
- if(inferCompressionCodec) {
- codec = compressionCodecFactory.getCodec(path);
- } else if (compressionType != CompressionType.NONE) {
- codec = getCompressionCodec(context, getConfiguration());
- }
+ ugi.doAs((PrivilegedAction<Object>) () -> {
+ InputStream stream = null;
+ CompressionCodec codec = null;
+ Configuration conf = getConfiguration();
+ final CompressionCodecFactory compressionCodecFactory = new
CompressionCodecFactory(conf);
+ final CompressionType compressionType =
getCompressionType(context);
+ final boolean inferCompressionCodec = compressionType ==
CompressionType.AUTOMATIC;
+
+ if (inferCompressionCodec) {
+ codec = compressionCodecFactory.getCodec(path);
+ } else if (compressionType != CompressionType.NONE) {
+ codec = getCompressionCodec(context, getConfiguration());
+ }
- FlowFile flowFile = finalFlowFile;
- final Path qualifiedPath = path.makeQualified(hdfs.getUri(),
hdfs.getWorkingDirectory());
- try {
- final String outputFilename;
- final String originalFilename = path.getName();
- stream = hdfs.open(path, 16384);
-
- // Check if compression codec is defined (inferred or
otherwise)
- if (codec != null) {
- stream = codec.createInputStream(stream);
- outputFilename =
StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
- } else {
- outputFilename = originalFilename;
- }
-
- flowFile = session.importFrom(stream, finalFlowFile);
- flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), outputFilename);
-
- stopWatch.stop();
- getLogger().info("Successfully received content from {}
for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
- flowFile = session.putAttribute(flowFile,
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
- session.getProvenanceReporter().fetch(flowFile,
qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, getSuccessRelationship());
- } catch (final FileNotFoundException | AccessControlException
e) {
- getLogger().error("Failed to retrieve content from {} for
{} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
- flowFile = session.putAttribute(flowFile,
getAttributePrefix() + ".failure.reason", e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, getFailureRelationship());
- } catch (final IOException e) {
- getLogger().error("Failed to retrieve content from {} for
{} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile,
e});
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, getCommsFailureRelationship());
- } finally {
- IOUtils.closeQuietly(stream);
+ FlowFile outgoingFlowFile = finalFlowFile;
+ final Path qualifiedPath = path.makeQualified(hdfs.getUri(),
hdfs.getWorkingDirectory());
+ try {
+ final String outputFilename;
+ final String originalFilename = path.getName();
+ stream = hdfs.open(path, 16384);
+
+ // Check if compression codec is defined (inferred or
otherwise)
+ if (codec != null) {
+ stream = codec.createInputStream(stream);
+ outputFilename = StringUtils.removeEnd(originalFilename,
codec.getDefaultExtension());
+ } else {
+ outputFilename = originalFilename;
}
- return null;
+ outgoingFlowFile = session.importFrom(stream, finalFlowFile);
+ outgoingFlowFile = session.putAttribute(outgoingFlowFile,
CoreAttributes.FILENAME.key(), outputFilename);
+
+ stopWatch.stop();
+ getLogger().info("Successfully received content from {} for {}
in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()});
+ outgoingFlowFile = session.putAttribute(outgoingFlowFile,
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
+ session.getProvenanceReporter().fetch(outgoingFlowFile,
qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+ session.transfer(outgoingFlowFile, getSuccessRelationship());
+ } catch (final FileNotFoundException | AccessControlException e) {
+ getLogger().error("Failed to retrieve content from {} for {}
due to {}; routing to failure", new Object[]{qualifiedPath, outgoingFlowFile,
e});
+ outgoingFlowFile = session.putAttribute(outgoingFlowFile,
getAttributePrefix() + ".failure.reason", e.getMessage());
+ outgoingFlowFile = session.penalize(outgoingFlowFile);
+ session.transfer(outgoingFlowFile, getFailureRelationship());
+ } catch (final IOException e) {
+ if (!handleAuthErrors(e, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
+ getLogger().error("Failed to retrieve content from {} for
{} due to {}; routing to comms.failure", qualifiedPath, outgoingFlowFile, e);
+ outgoingFlowFile = session.penalize(outgoingFlowFile);
+ session.transfer(outgoingFlowFile,
getCommsFailureRelationship());
+ }
+ } finally {
+ IOUtils.closeQuietly(stream);
}
+
+ return null;
});
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index f1e8661366..c168a18fa2 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -49,6 +49,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
@@ -294,12 +295,12 @@ public class GetHDFS extends AbstractHadoopProcessor {
}
if (logEmptyListing.getAndDecrement() > 0) {
getLogger().info("Obtained file listing in {}
milliseconds; listing had {} items, {} of which were new",
- new Object[]{millis, listedFiles.size(),
newItems});
+ millis, listedFiles.size(), newItems);
}
}
} catch (IOException e) {
- context.yield();
- getLogger().warn("Error while retrieving list of files due to
{}", new Object[]{e});
+ handleAuthErrors(e, session, context, new
GSSExceptionRollbackYieldSessionHandler());
+ getLogger().warn("Error while retrieving list of files due to
{}", e.getMessage(), e);
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -388,20 +389,20 @@ public class GetHDFS extends AbstractHadoopProcessor {
flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), outputFilename);
if (!keepSourceFiles &&
!getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () ->
hdfs.delete(file, false))) {
- getLogger().warn("Could not remove {} from HDFS. Not
ingesting this file ...",
- new Object[]{file});
+ getLogger().warn("Could not remove {} from HDFS. Not
ingesting this file ...", file);
session.remove(flowFile);
continue;
}
session.getProvenanceReporter().receive(flowFile,
file.toString());
session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("retrieved {} from HDFS {} in {} milliseconds
at a rate of {}",
- new Object[]{flowFile, file, millis, dataRate});
+ getLogger().info("retrieved {} from HDFS {} in {} milliseconds
at a rate of {}", flowFile, file, millis, dataRate);
} catch (final Throwable t) {
- getLogger().error("Error retrieving file {} from HDFS due to
{}", new Object[]{file, t});
- session.rollback();
- context.yield();
+ if (!handleAuthErrors(t, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
+ getLogger().error("Error retrieving file {} from HDFS due
to {}", file, t);
+ session.rollback();
+ context.yield();
+ }
} finally {
IOUtils.closeQuietly(stream);
stream = null;
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
index 8383732f77..23541f0e81 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
@@ -55,6 +55,7 @@ import
org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import static
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.ALL;
import static
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.DIR;
@@ -75,10 +76,10 @@ import static
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequ
@WritesAttribute(attribute = "hdfs.owner", description = "The user
that owns the object in HDFS"),
@WritesAttribute(attribute = "hdfs.group", description = "The group
that owns the object in HDFS"),
@WritesAttribute(attribute = "hdfs.lastModified", description = "The
timestamp of when the object in HDFS was last modified, as milliseconds since
midnight Jan 1, 1970 UTC"),
- @WritesAttribute(attribute = "hdfs.length", description = ""
- + "In case of files: The number of bytes in the file in HDFS.
"
+ @WritesAttribute(attribute = "hdfs.length", description =
+ "In case of files: The number of bytes in the file in HDFS. "
+ "In case of dirs: Retuns storage space consumed by
directory. "
- + ""),
+ ),
@WritesAttribute(attribute = "hdfs.count.files", description = "In
case of type='directory' will represent total count of files under this dir. "
+ "Won't be populated to other types of HDFS objects. "),
@WritesAttribute(attribute = "hdfs.count.dirs", description = "In case
of type='directory' will represent total count of directories under this dir
(including itself). "
@@ -327,9 +328,12 @@ public class GetHDFSFileInfo extends
AbstractHadoopProcessor {
ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " +
e);
session.transfer(ff, REL_FAILURE);
} catch (final Exception e) {
- getLogger().error("Failed to perform listing of HDFS due to {}",
new Object[]{e});
- ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " +
e);
- session.transfer(ff, REL_FAILURE);
+ // Catch GSSExceptions and reset the resources
+ if (!handleAuthErrors(e, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
+ getLogger().error("Failed to perform listing of HDFS due to
{}", new Object[]{e});
+ ff = session.putAttribute(ff, "hdfs.status", "Failed due to: "
+ e);
+ session.transfer(ff, REL_FAILURE);
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
index 86fe791733..6a674599b7 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
@@ -30,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.apache.nifi.util.StopWatch;
@@ -109,9 +110,14 @@ public class GetHDFSSequenceFile extends GetHDFS {
logger.warn("Unable to delete path " + file.toString() + "
from HDFS. Will likely be picked up over and over...");
}
} catch (Throwable t) {
- logger.error("Error retrieving file {} from HDFS due to {}",
new Object[]{file, t});
- session.rollback();
- context.yield();
+ final String errorString = "Error retrieving file {} from HDFS
due to {}";
+ if (!handleAuthErrors(t, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
+ logger.error(errorString, file, t);
+ session.rollback();
+ context.yield();
+ } else {
+ logger.warn(errorString, file, t);
+ }
} finally {
stopWatch.stop();
long totalSize = 0;
@@ -132,12 +138,7 @@ public class GetHDFSSequenceFile extends GetHDFS {
}
protected Set<FlowFile> getFlowFiles(final Configuration conf, final
FileSystem hdfs, final SequenceFileReader<Set<FlowFile>> reader, final Path
file) throws Exception {
- PrivilegedExceptionAction<Set<FlowFile>> privilegedExceptionAction =
new PrivilegedExceptionAction<Set<FlowFile>>() {
- @Override
- public Set<FlowFile> run() throws Exception {
- return reader.readSequenceFile(file, conf, hdfs);
- }
- };
+ PrivilegedExceptionAction<Set<FlowFile>> privilegedExceptionAction =
() -> reader.readSequenceFile(file, conf, hdfs);
UserGroupInformation userGroupInformation = getUserGroupInformation();
if (userGroupInformation == null) {
return privilegedExceptionAction.run();
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index 33e1fac44c..aba9065820 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -46,10 +46,13 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;
+import org.ietf.jgss.GSSException;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -57,6 +60,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -255,7 +259,10 @@ public class MoveHDFS extends AbstractHadoopProcessor {
throw new IOException("Input Directory or File does not exist
in HDFS");
}
} catch (Exception e) {
- getLogger().error("Failed to retrieve content from {} for {} due
to {}; routing to failure", new Object[]{filenameValue, flowFile, e});
+ if (handleAuthErrors(e, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
+ return;
+ }
+ getLogger().error("Failed to retrieve content from {} for {} due
to {}; routing to failure", filenameValue, flowFile, e);
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason",
e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
@@ -294,7 +301,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
if (logEmptyListing.getAndDecrement() > 0) {
getLogger().info(
"Obtained file listing in {} milliseconds; listing
had {} items, {} of which were new",
- new Object[]{millis, listedFiles.size(),
newItems});
+ millis, listedFiles.size(), newItems);
}
}
} catch (IOException e) {
@@ -322,7 +329,12 @@ public class MoveHDFS extends AbstractHadoopProcessor {
queueLock.unlock();
}
- processBatchOfFiles(files, context, session, flowFile);
+ try {
+ processBatchOfFiles(files, context, session, flowFile);
+ session.remove(flowFile);
+ } catch (UncheckedIOException e) {
+ handleAuthErrors(e, session, context, new
GSSExceptionRollbackYieldSessionHandler());
+ }
queueLock.lock();
try {
@@ -330,8 +342,6 @@ public class MoveHDFS extends AbstractHadoopProcessor {
} finally {
queueLock.unlock();
}
-
- session.remove(flowFile);
}
protected void processBatchOfFiles(final List<Path> files, final
ProcessContext context,
@@ -352,95 +362,95 @@ public class MoveHDFS extends AbstractHadoopProcessor {
for (final Path file : files) {
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- FlowFile flowFile = session.create(parentFlowFile);
- try {
- final String originalFilename = file.getName();
- final Path outputDirPath = getNormalizedPath(context,
OUTPUT_DIRECTORY, parentFlowFile);
- final Path newFile = new Path(outputDirPath,
originalFilename);
- final boolean destinationExists = hdfs.exists(newFile);
- // If destination file already exists, resolve that
- // based on processor configuration
- if (destinationExists) {
- switch (processorConfig.getConflictResolution()) {
- case REPLACE_RESOLUTION:
- // Remove destination file (newFile) to
replace
- if (hdfs.delete(newFile, false)) {
- getLogger().info("deleted {} in order
to replace with the contents of {}",
- new Object[]{newFile,
flowFile});
- }
- break;
- case IGNORE_RESOLUTION:
- session.transfer(flowFile, REL_SUCCESS);
- getLogger().info(
- "transferring {} to success
because file with same name already exists",
- new Object[]{flowFile});
- return null;
- case FAIL_RESOLUTION:
-
session.transfer(session.penalize(flowFile), REL_FAILURE);
- getLogger().warn(
- "penalizing {} and routing to
failure because file with same name already exists",
- new Object[]{flowFile});
- return null;
- default:
- break;
- }
+ ugi.doAs((PrivilegedAction<Object>) () -> {
+ FlowFile flowFile = session.create(parentFlowFile);
+ try {
+ final String originalFilename = file.getName();
+ final Path outputDirPath = getNormalizedPath(context,
OUTPUT_DIRECTORY, parentFlowFile);
+ final Path newFile = new Path(outputDirPath,
originalFilename);
+ final boolean destinationExists = hdfs.exists(newFile);
+ // If destination file already exists, resolve that
+ // based on processor configuration
+ if (destinationExists) {
+ switch (processorConfig.getConflictResolution()) {
+ case REPLACE_RESOLUTION:
+ // Remove destination file (newFile) to replace
+ if (hdfs.delete(newFile, false)) {
+ getLogger().info("deleted {} in order to
replace with the contents of {}",
+ new Object[]{newFile, flowFile});
+ }
+ break;
+ case IGNORE_RESOLUTION:
+ session.transfer(flowFile, REL_SUCCESS);
+ getLogger().info(
+ "transferring {} to success because
file with same name already exists",
+ new Object[]{flowFile});
+ return null;
+ case FAIL_RESOLUTION:
+ session.transfer(session.penalize(flowFile),
REL_FAILURE);
+ getLogger().warn(
+ "penalizing {} and routing to failure
because file with same name already exists",
+ new Object[]{flowFile});
+ return null;
+ default:
+ break;
}
+ }
- // Create destination directory if it does not exist
- try {
- if
(!hdfs.getFileStatus(outputDirPath).isDirectory()) {
- throw new IOException(outputDirPath.toString()
- + " already exists and is not a
directory");
- }
- } catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(outputDirPath)) {
- throw new IOException(outputDirPath.toString()
+ " could not be created");
- }
- changeOwner(context, hdfs, outputDirPath);
+ // Create destination directory if it does not exist
+ try {
+ if (!hdfs.getFileStatus(outputDirPath).isDirectory()) {
+ throw new IOException(outputDirPath + " already
exists and is not a directory");
+ }
+ } catch (FileNotFoundException fe) {
+ if (!hdfs.mkdirs(outputDirPath)) {
+ throw new IOException(outputDirPath + " could not
be created");
}
+ changeOwner(context, hdfs, outputDirPath);
+ }
- boolean moved = false;
- for (int i = 0; i < 10; i++) { // try to rename
multiple
- // times.
- if (processorConfig.getOperation().equals("move"))
{
- if (hdfs.rename(file, newFile)) {
- moved = true;
- break;// rename was successful
- }
- } else {
- if (FileUtil.copy(hdfs, file, hdfs, newFile,
false, conf)) {
- moved = true;
- break;// copy was successful
- }
+ boolean moved = false;
+ for (int i = 0; i < 10; i++) { // try to rename multiple
+ // times.
+ if (processorConfig.getOperation().equals("move")) {
+ if (hdfs.rename(file, newFile)) {
+ moved = true;
+ break;// rename was successful
+ }
+ } else {
+ if (FileUtil.copy(hdfs, file, hdfs, newFile,
false, conf)) {
+ moved = true;
+ break;// copy was successful
}
- Thread.sleep(200L);// try waiting to let whatever
might cause rename failure to resolve
- }
- if (!moved) {
- throw new ProcessException("Could not move file "
+ file + " to its final filename");
}
+ Thread.sleep(200L);// try waiting to let whatever
might cause rename failure to resolve
+ }
+ if (!moved) {
+ throw new ProcessException("Could not move file " +
file + " to its final filename");
+ }
- changeOwner(context, hdfs, newFile);
- final String outputPath = newFile.toString();
- final String newFilename = newFile.getName();
- final String hdfsPath = newFile.getParent().toString();
- flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), newFilename);
- flowFile = session.putAttribute(flowFile,
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
- final Path qualifiedPath =
newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
- flowFile = session.putAttribute(flowFile,
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
- final String transitUri = hdfs.getUri() +
StringUtils.prependIfMissing(outputPath, "/");
- session.getProvenanceReporter().send(flowFile,
transitUri);
- session.transfer(flowFile, REL_SUCCESS);
-
- } catch (final Throwable t) {
- getLogger().error("Failed to rename on HDFS due to
{}", new Object[]{t});
- session.transfer(session.penalize(flowFile),
REL_FAILURE);
- context.yield();
+ changeOwner(context, hdfs, newFile);
+ final String outputPath = newFile.toString();
+ final String newFilename = newFile.getName();
+ final String hdfsPath = newFile.getParent().toString();
+ flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), newFilename);
+ flowFile = session.putAttribute(flowFile,
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+ final Path qualifiedPath =
newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+ flowFile = session.putAttribute(flowFile,
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
+ final String transitUri = hdfs.getUri() +
StringUtils.prependIfMissing(outputPath, "/");
+ session.getProvenanceReporter().send(flowFile, transitUri);
+ session.transfer(flowFile, REL_SUCCESS);
+
+ } catch (final Throwable t) {
+ final Optional<GSSException> causeOptional = findCause(t,
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+ if (causeOptional.isPresent()) {
+ throw new UncheckedIOException(new
IOException(causeOptional.get()));
}
- return null;
+ getLogger().error("Failed to rename on HDFS due to {}",
new Object[]{t});
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ context.yield();
}
+ return null;
});
}
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 91e91ff7b1..9a60b2fcc5 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.hadoop;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
@@ -54,16 +53,14 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
-import org.ietf.jgss.GSSException;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
@@ -73,11 +70,8 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
/**
* This processor copies FlowFiles to HDFS.
@@ -352,18 +346,18 @@ public class PutHDFS extends AbstractHadoopProcessor {
case REPLACE_RESOLUTION:
if (hdfs.delete(copyFile, false)) {
getLogger().info("deleted {} in order to
replace with the contents of {}",
- new Object[]{copyFile,
putFlowFile});
+ copyFile, putFlowFile);
}
break;
case IGNORE_RESOLUTION:
session.transfer(putFlowFile,
getSuccessRelationship());
getLogger().info("transferring {} to success
because file with same name already exists",
- new Object[]{putFlowFile});
+ putFlowFile);
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
getLogger().warn("penalizing {} and routing to
failure because file with same name already exists",
- new Object[]{putFlowFile});
+ putFlowFile);
return null;
default:
break;
@@ -372,63 +366,58 @@ public class PutHDFS extends AbstractHadoopProcessor {
// Write FlowFile to temp file on HDFS
final StopWatch stopWatch = new StopWatch(true);
- session.read(putFlowFile, new InputStreamCallback() {
-
- @Override
- public void process(InputStream in) throws IOException
{
- OutputStream fos = null;
- Path createdFile = null;
- try {
- if (conflictResponse.equals(APPEND_RESOLUTION)
&& destinationExists) {
- fos = hdfs.append(copyFile, bufferSize);
- } else {
- final EnumSet<CreateFlag> cflags =
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
- if (shouldIgnoreLocality(context,
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
- }
+ session.read(putFlowFile, in -> {
+ OutputStream fos = null;
+ Path createdFile = null;
+ try {
+ if (conflictResponse.equals(APPEND_RESOLUTION) &&
destinationExists) {
+ fos = hdfs.append(copyFile, bufferSize);
+ } else {
+ final EnumSet<CreateFlag> cflags =
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
- fos = hdfs.create(actualCopyFile,
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication,
blockSize,
- null, null);
+ if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
}
- if (codec != null) {
- fos = codec.createOutputStream(fos);
+ fos = hdfs.create(actualCopyFile,
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication,
blockSize,
+ null, null);
+ }
+
+ if (codec != null) {
+ fos = codec.createOutputStream(fos);
+ }
+ createdFile = actualCopyFile;
+ BufferedInputStream bis = new
BufferedInputStream(in);
+ StreamUtils.copy(bis, fos);
+ bis = null;
+ fos.flush();
+ } finally {
+ try {
+ if (fos != null) {
+ fos.close();
}
- createdFile = actualCopyFile;
- BufferedInputStream bis = new
BufferedInputStream(in);
- StreamUtils.copy(bis, fos);
- bis = null;
- fos.flush();
- } finally {
- try {
- if (fos != null) {
- fos.close();
- }
- } catch (Throwable t) {
- // when talking to remote HDFS clusters,
we don't notice problems until fos.close()
- if (createdFile != null) {
- try {
- hdfs.delete(createdFile, false);
- } catch (Throwable ignore) {
- }
+ } catch (Throwable t) {
+ // when talking to remote HDFS clusters, we
don't notice problems until fos.close()
+ if (createdFile != null) {
+ try {
+ hdfs.delete(createdFile, false);
+ } catch (Throwable ignore) {
}
- throw t;
}
- fos = null;
+ throw t;
}
+ fos = null;
}
-
});
stopWatch.stop();
final String dataRate =
stopWatch.calculateDataRate(putFlowFile.getSize());
final long millis =
stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile;
- if (
- writingStrategy.equals(WRITE_AND_RENAME)
- && (!conflictResponse.equals(APPEND_RESOLUTION) ||
(conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
+ if (
+ writingStrategy.equals(WRITE_AND_RENAME)
+ &&
(!conflictResponse.equals(APPEND_RESOLUTION) ||
(conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
) {
boolean renamed = false;
@@ -449,7 +438,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
}
getLogger().info("copied {} to HDFS at {} in {}
milliseconds at a rate of {}",
- new Object[]{putFlowFile, copyFile, millis,
dataRate});
+ putFlowFile, copyFile, millis, dataRate);
final String newFilename = copyFile.getName();
final String hdfsPath = copyFile.getParent().toString();
@@ -462,18 +451,10 @@ public class PutHDFS extends AbstractHadoopProcessor {
session.transfer(putFlowFile, getSuccessRelationship());
- } catch (final IOException e) {
- Optional<GSSException> causeOptional = findCause(e,
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
- if (causeOptional.isPresent()) {
- getLogger().warn("An error occurred while connecting
to HDFS. "
- + "Rolling back session, and
penalizing flow file {}",
- new Object[]
{putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
- session.rollback(true);
- } else {
- getLogger().error("Failed to access HDFS due to {}",
new Object[]{e});
- session.transfer(putFlowFile,
getFailureRelationship());
- }
} catch (final Throwable t) {
+ if (handleAuthErrors(t, session, context, new
GSSExceptionRollbackYieldSessionHandler())) {
+ return null;
+ }
if (tempDotCopyFile != null) {
try {
hdfs.delete(tempDotCopyFile, false);
@@ -548,21 +529,6 @@ public class PutHDFS extends AbstractHadoopProcessor {
return group == null || group.isEmpty() ? null : group;
}
- /**
- * Returns an optional with the first throwable in the causal chain that
is assignable to the provided cause type,
- * and satisfies the provided cause predicate, {@link Optional#empty()}
otherwise.
- * @param t The throwable to inspect for the cause.
- * @return Throwable Cause
- */
- private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T>
expectedCauseType, Predicate<T> causePredicate) {
- Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
- return causalChain
- .filter(expectedCauseType::isInstance)
- .map(expectedCauseType::cast)
- .filter(causePredicate)
- .findFirst();
- }
-
protected void changeOwner(final ProcessContext context, final FileSystem
hdfs, final Path name, final FlowFile flowFile) {
try {
// Change owner and group of file if configured to do so
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/GSSExceptionRollbackYieldSessionHandler.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/GSSExceptionRollbackYieldSessionHandler.java
new file mode 100644
index 0000000000..8183c1f4cd
--- /dev/null
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/GSSExceptionRollbackYieldSessionHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.function.BiConsumer;
+
+public class GSSExceptionRollbackYieldSessionHandler implements
BiConsumer<ProcessSession, ProcessContext> {
+ @Override
+ public void accept(ProcessSession processSession, ProcessContext
processContext) {
+ processSession.rollback();
+ processContext.yield();
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
index af1df3d56b..7b59817d21 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
@@ -27,6 +27,10 @@ import
org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.ietf.jgss.GSSException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -37,13 +41,14 @@ import java.security.PrivilegedExceptionAction;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class GetHDFSSequenceFileTest {
- private HdfsResources hdfsResources;
+ private HdfsResources hdfsResourcesLocal;
private GetHDFSSequenceFile getHDFSSequenceFile;
private Configuration configuration;
private FileSystem fileSystem;
@@ -55,9 +60,8 @@ public class GetHDFSSequenceFileTest {
configuration = mock(Configuration.class);
fileSystem = mock(FileSystem.class);
userGroupInformation = mock(UserGroupInformation.class);
- hdfsResources = new HdfsResources(configuration, fileSystem,
userGroupInformation, null);
- getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
- getHDFSSequenceFile.kerberosProperties =
mock(KerberosProperties.class);
+ hdfsResourcesLocal = new HdfsResources(configuration, fileSystem,
userGroupInformation, null);
+ getHDFSSequenceFile = new TestableGetHDFSSequenceFile(new
KerberosProperties(null), userGroupInformation);
reloginTried = false;
init();
}
@@ -75,6 +79,7 @@ public class GetHDFSSequenceFileTest {
public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin()
throws Exception {
SequenceFileReader reader = mock(SequenceFileReader.class);
Path file = mock(Path.class);
+ getHDFSSequenceFile.kerberosProperties =
mock(KerberosProperties.class);
getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader,
file);
ArgumentCaptor<PrivilegedExceptionAction>
privilegedExceptionActionArgumentCaptor =
ArgumentCaptor.forClass(PrivilegedExceptionAction.class);
verifyNoMoreInteractions(reader);
@@ -86,7 +91,8 @@ public class GetHDFSSequenceFileTest {
@Test
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
- hdfsResources = new HdfsResources(configuration, fileSystem, null,
null);
+ getHDFSSequenceFile = new TestableGetHDFSSequenceFile(new
KerberosProperties(null), null);
+ hdfsResourcesLocal = new HdfsResources(configuration, fileSystem,
null, null);
init();
SequenceFileReader reader = mock(SequenceFileReader.class);
Path file = mock(Path.class);
@@ -94,10 +100,45 @@ public class GetHDFSSequenceFileTest {
verify(reader).readSequenceFile(file, configuration, fileSystem);
}
+ @Test
+ public void testGSSExceptionOnDoAs() throws Exception {
+ NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
+
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+ GetHDFSSequenceFile testSubject = new
TestableGetHDFSSequenceFile(getHDFSSequenceFile.kerberosProperties,
userGroupInformation, true);
+ TestRunner runner = TestRunners.newTestRunner(testSubject);
+ runner.setProperty(GetHDFSSequenceFile.DIRECTORY,
"path/does/not/exist");
+ runner.run();
+ // assert no flowfiles transferred to outgoing relationships
+ runner.assertTransferCount(MoveHDFS.REL_SUCCESS, 0);
+ runner.assertTransferCount(MoveHDFS.REL_FAILURE, 0);
+ }
+
public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
+
+ UserGroupInformation userGroupInformation;
+ private KerberosProperties kerberosProperties;
+
+
+ public TestableGetHDFSSequenceFile(KerberosProperties
kerberosProperties, UserGroupInformation ugi) throws IOException {
+ this(kerberosProperties, ugi, false);
+ }
+
+ public TestableGetHDFSSequenceFile(KerberosProperties
kerberosProperties, UserGroupInformation ugi, boolean failOnDoAs) throws
IOException {
+ this.kerberosProperties = kerberosProperties;
+ this.userGroupInformation = ugi;
+ if(failOnDoAs && userGroupInformation != null) {
+ try {
+
when(userGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenThrow(new
IOException(new GSSException(13)));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+
@Override
HdfsResources resetHDFSResources(final List<String> resourceLocations,
ProcessContext context) throws IOException {
- return hdfsResources;
+ return hdfsResourcesLocal;
}
@Override
@@ -109,5 +150,9 @@ public class GetHDFSSequenceFileTest {
protected KerberosProperties getKerberosProperties(File
kerberosConfigFile) {
return kerberosProperties;
}
+
+ protected UserGroupInformation getUserGroupInformation() {
+ return userGroupInformation;
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index 903e38d750..436d2a053a 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -31,6 +31,7 @@ import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.ietf.jgss.GSSException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
@@ -58,12 +59,11 @@ import static org.mockito.Mockito.when;
@DisabledOnOs(OS.WINDOWS)
public class GetHDFSTest {
- private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
@BeforeEach
public void setup() {
- mockNiFiProperties = mock(NiFiProperties.class);
+ NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
}
@@ -179,7 +179,7 @@ public class GetHDFSTest {
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
-
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
+ assertEquals("randombytes-1",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
InputStream expected =
getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected);
}
@@ -198,7 +198,7 @@ public class GetHDFSTest {
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
-
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
+ assertEquals("randombytes-1.gz",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
InputStream expected =
getClass().getResourceAsStream("/testdata/randombytes-1.gz");
flowFile.assertContentEquals(expected);
}
@@ -217,7 +217,7 @@ public class GetHDFSTest {
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
-
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+ assertEquals("13545423550275052.zip",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
InputStream expected =
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
}
@@ -236,7 +236,7 @@ public class GetHDFSTest {
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
-
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+ assertEquals("13545423550275052.zip",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
InputStream expected =
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
final List<ProvenanceEventRecord> provenanceEvents =
runner.getProvenanceEvents();
@@ -304,12 +304,12 @@ public class GetHDFSTest {
final Object result;
if (callCounter == 0) {
when(mockFileSystem.exists(any(Path.class))).thenReturn(directoryExists);
- result = ((PrivilegedExceptionAction)
invocationOnMock.getArgument(0)).run();
+ result = ((PrivilegedExceptionAction<?>)
invocationOnMock.getArgument(0)).run();
verify(mockUserGroupInformation, times(callCounter +
1)).doAs(any(PrivilegedExceptionAction.class));
verify(mockFileSystem).exists(any(Path.class));
} else {
when(mockFileSystem.listStatus(any(Path.class))).thenReturn(new FileStatus[0]);
- result = ((PrivilegedExceptionAction)
invocationOnMock.getArgument(0)).run();
+ result = ((PrivilegedExceptionAction<?>)
invocationOnMock.getArgument(0)).run();
verify(mockUserGroupInformation, times(callCounter +
1)).doAs(any(PrivilegedExceptionAction.class));
verify(mockFileSystem).listStatus(any(Path.class));
}
@@ -322,7 +322,24 @@ public class GetHDFSTest {
// THEN
verify(mockFileSystem).getUri();
- verifyNoMoreInteractions(mockFileSystem, mockUserGroupInformation);
+ verifyNoMoreInteractions(mockUserGroupInformation);
+ }
+
+ @Test
+ public void testGSSExceptionOnExists() throws Exception {
+ FileSystem mockFileSystem = mock(FileSystem.class);
+ UserGroupInformation mockUserGroupInformation =
mock(UserGroupInformation.class);
+
+ GetHDFS testSubject = new TestableGetHDFSForUGI(kerberosProperties,
mockFileSystem, mockUserGroupInformation);
+ TestRunner runner = TestRunners.newTestRunner(testSubject);
+ runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata");
+
when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenThrow(new
IOException(new GSSException(13)));
+ runner.run();
+
+ // Assert session rollback
+ runner.assertTransferCount(GetHDFS.REL_SUCCESS, 0);
+ // assert that no files were penalized
+ runner.assertPenalizeCount(0);
}
private static class TestableGetHDFS extends GetHDFS {
@@ -340,8 +357,8 @@ public class GetHDFSTest {
}
private static class TestableGetHDFSForUGI extends TestableGetHDFS {
- private FileSystem mockFileSystem;
- private UserGroupInformation mockUserGroupInformation;
+ private final FileSystem mockFileSystem;
+ private final UserGroupInformation mockUserGroupInformation;
public TestableGetHDFSForUGI(KerberosProperties
testKerberosProperties, FileSystem mockFileSystem, UserGroupInformation
mockUserGroupInformation) {
super(testKerberosProperties);
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
index 8e14507571..6a95d9d66d 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
@@ -18,27 +18,38 @@ package org.apache.nifi.processors.hadoop;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.hadoop.util.MockFileSystem;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.ietf.jgss.GSSException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import javax.security.sasl.SaslException;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,7 +64,6 @@ public class MoveHDFSTest {
private static final String OUTPUT_DIRECTORY = "target/test-data-output";
private static final String TEST_DATA_DIRECTORY =
"src/test/resources/testdata";
private static final String INPUT_DIRECTORY = "target/test-data-input";
- private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
@BeforeAll
@@ -63,7 +73,7 @@ public class MoveHDFSTest {
@BeforeEach
public void setup() {
- mockNiFiProperties = mock(NiFiProperties.class);
+ NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
}
@@ -244,6 +254,37 @@ public class MoveHDFSTest {
assertEquals(0, flowFiles.size());
}
+ @Test
+ public void testPutFileWithGSSException() throws IOException {
+ MockFileSystem noCredentialsFileSystem = new MockFileSystem() {
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ throw new IOException("ioe", new SaslException("sasle", new
GSSException(13)));
+ }
+ };
+ noCredentialsFileSystem.setFailOnExists(true);
+ TestRunner runner = TestRunners.newTestRunner(new
TestableMoveHDFS(kerberosProperties, noCredentialsFileSystem));
+ runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE,
"input/does/not/exist");
+ runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, "target/test-classes");
+ runner.setProperty(MoveHDFS.CONFLICT_RESOLUTION, "replace");
+
+ try (FileInputStream fis = new
FileInputStream("src/test/resources/testdata/randombytes-1")) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+ runner.enqueue(fis, attributes);
+ runner.run();
+ }
+
+ // assert no flowfiles transferred to outgoing relationships
+ runner.assertTransferCount(MoveHDFS.REL_SUCCESS, 0);
+ runner.assertTransferCount(MoveHDFS.REL_FAILURE, 0);
+ // assert the processor's queue is not empty (session rollback)
+ assertFalse(runner.isQueueEmpty());
+ // assert that no files were penalized
+ runner.assertPenalizeCount(0);
+ noCredentialsFileSystem.setFailOnExists(false);
+ }
+
@Test
public void testPutWhenAlreadyExistingShouldFailWhenFAIL_RESOLUTION()
throws IOException {
testPutWhenAlreadyExisting(MoveHDFS.FAIL_RESOLUTION,
MoveHDFS.REL_FAILURE, "randombytes-1");
@@ -292,15 +333,30 @@ public class MoveHDFSTest {
private static class TestableMoveHDFS extends MoveHDFS {
- private KerberosProperties testKerberosProperties;
+ private final KerberosProperties testKerberosProperties;
+ private final FileSystem fileSystem;
public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
+ this(testKerberosProperties, null);
+ }
+
+ public TestableMoveHDFS(KerberosProperties testKerberosProperties,
FileSystem fileSystem) {
this.testKerberosProperties = testKerberosProperties;
+ this.fileSystem = fileSystem;
}
@Override
protected KerberosProperties getKerberosProperties(File
kerberosConfigFile) {
return testKerberosProperties;
}
+
+ @Override
+ protected FileSystem getFileSystem(Configuration config) throws
IOException {
+ return fileSystem == null ? super.getFileSystem(config) :
fileSystem;
+ }
+ @Override
+ protected FileSystem getFileSystem() {
+ return fileSystem == null ? super.getFileSystem() : fileSystem;
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index ab33df270e..637f312b56 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -17,24 +17,20 @@
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.util.Progressable;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.hadoop.util.MockFileSystem;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
@@ -47,13 +43,9 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import javax.security.sasl.SaslException;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -63,7 +55,6 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
@@ -75,7 +66,7 @@ public class PutHDFSTest {
private final static String FILE_NAME = "randombytes-1";
private KerberosProperties kerberosProperties;
- private FileSystem mockFileSystem;
+ private MockFileSystem mockFileSystem;
@BeforeEach
public void setup() {
@@ -359,19 +350,6 @@ public class PutHDFSTest {
// assert no flowfiles transferred to outgoing relationships
runner.assertTransferCount(PutHDFS.REL_SUCCESS, 0);
runner.assertTransferCount(PutHDFS.REL_FAILURE, 0);
- // assert the input flowfile was penalized
- List<MockFlowFile> penalizedFlowFiles = runner.getPenalizedFlowFiles();
- assertEquals(1, penalizedFlowFiles.size());
- assertEquals("randombytes-1",
penalizedFlowFiles.iterator().next().getAttribute(CoreAttributes.FILENAME.key()));
- // assert the processor's queue is not empty
- assertFalse(runner.isQueueEmpty());
- assertEquals(1, runner.getQueueSize().getObjectCount());
- // assert the input file is back on the queue
- ProcessSession session =
runner.getProcessSessionFactory().createSession();
- FlowFile queuedFlowFile = session.get();
- assertNotNull(queuedFlowFile);
- assertEquals("randombytes-1",
queuedFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
- session.rollback();
}
@Test
@@ -610,7 +588,8 @@ public class PutHDFSTest {
@Test
public void testPutFileWithCloseException() throws IOException {
- mockFileSystem = new MockFileSystem(true);
+ mockFileSystem = new MockFileSystem();
+ mockFileSystem.setFailOnClose(true);
String dirName = "target/testPutFileCloseException";
File file = new File(dirName);
file.mkdirs();
@@ -635,10 +614,38 @@ public class PutHDFSTest {
mockFileSystem.delete(p, true);
}
+ @Test
+ public void testPutFileWithCreateException() throws IOException {
+ mockFileSystem = new MockFileSystem();
+ mockFileSystem.setFailOnCreate(true);
+ String dirName = "target/testPutFileCreateException";
+ File file = new File(dirName);
+ file.mkdirs();
+ Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(),
mockFileSystem.getWorkingDirectory());
+
+ TestRunner runner = TestRunners.newTestRunner(new
TestablePutHDFS(kerberosProperties, mockFileSystem));
+ runner.setProperty(PutHDFS.DIRECTORY, dirName);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+
+ try (FileInputStream fis = new
FileInputStream("src/test/resources/testdata/randombytes-1")) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+ runner.enqueue(fis, attributes);
+ runner.run();
+ }
+
+ List<MockFlowFile> failedFlowFiles = runner
+ .getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+ assertFalse(failedFlowFiles.isEmpty());
+ assertTrue(failedFlowFiles.get(0).isPenalized());
+
+ mockFileSystem.delete(p, true);
+ }
+
private class TestablePutHDFS extends PutHDFS {
- private KerberosProperties testKerberosProperties;
- private FileSystem fileSystem;
+ private final KerberosProperties testKerberosProperties;
+ private final FileSystem fileSystem;
TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem
fileSystem) {
this.testKerberosProperties = testKerberosProperties;
@@ -661,134 +668,4 @@ public class PutHDFSTest {
return fileSystem;
}
}
-
- private static class MockFileSystem extends FileSystem {
- private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
- private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
- private final boolean failOnClose;
-
- public MockFileSystem() {
- failOnClose = false;
- }
-
- public MockFileSystem(boolean failOnClose) {
- this.failOnClose = failOnClose;
- }
-
- public void setAcl(final Path path, final List<AclEntry> aclSpec) {
- pathToAcl.put(path, aclSpec);
- }
-
- @Override
- public AclStatus getAclStatus(final Path path) {
- return new
AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, new
ArrayList<>())).build();
- }
-
- @Override
- public URI getUri() {
- return URI.create("file:///");
- }
-
- @Override
- public FSDataInputStream open(final Path f, final int bufferSize) {
- return null;
- }
-
- @Override
- public FSDataOutputStream create(final Path f, final FsPermission
permission, final boolean overwrite, final int bufferSize, final short
replication,
- final long blockSize, final
Progressable progress) {
- pathToStatus.put(f, newFile(f, permission));
- if(failOnClose) {
- return new FSDataOutputStream(new ByteArrayOutputStream(), new
Statistics("")) {
- @Override
- public void close() throws IOException {
- super.close();
- throw new IOException("Fail on close");
- }
- };
- } else {
- return new FSDataOutputStream(new ByteArrayOutputStream(), new
Statistics(""));
- }
- }
-
- @Override
- public FSDataOutputStream append(final Path f, final int bufferSize,
final Progressable progress) {
- return null;
- }
-
- @Override
- public boolean rename(final Path src, final Path dst) {
- if (pathToStatus.containsKey(src)) {
- pathToStatus.put(dst, pathToStatus.remove(src));
- } else {
- return false;
- }
- return true;
- }
-
- @Override
- public boolean delete(final Path f, final boolean recursive) {
- if (pathToStatus.containsKey(f)) {
- pathToStatus.remove(f);
- } else {
- return false;
- }
- return true;
- }
-
- @Override
- public FileStatus[] listStatus(final Path f) {
- return null;
- }
-
- @Override
- public void setWorkingDirectory(final Path new_dir) {
-
- }
-
- @Override
- public Path getWorkingDirectory() {
- return new Path(new File(".").getAbsolutePath());
- }
-
- @Override
- public boolean mkdirs(final Path f, final FsPermission permission) {
- return false;
- }
-
- @Override
- public boolean mkdirs(Path f) {
- pathToStatus.put(f, newDir(f));
- return true;
- }
-
- @Override
- public FileStatus getFileStatus(final Path f) throws IOException {
- final FileStatus fileStatus = pathToStatus.get(f);
- if (fileStatus == null) throw new FileNotFoundException();
- return fileStatus;
- }
-
- @Override
- public boolean exists(Path f) {
- return pathToStatus.containsKey(f);
- }
-
- private FileStatus newFile(Path p, FsPermission permission) {
- return new FileStatus(100L, false, 3, 128 * 1024 * 1024,
1523456000000L, 1523457000000L, permission, "owner", "group", p);
- }
-
- private FileStatus newDir(Path p) {
- return new FileStatus(1L, true, 3, 128 * 1024 * 1024,
1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group",
(Path)null, p, true, false, false);
- }
-
- @Override
- public long getDefaultBlockSize(Path f) {
- return 33554432L;
- }
- }
-
- static FsPermission perms(short p) {
- return new FsPermission(p);
- }
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
index 6b64cca767..11b7c037de 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
@@ -27,6 +27,7 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.ietf.jgss.GSSException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -45,13 +46,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestDeleteHDFS {
- private NiFiProperties mockNiFiProperties;
private FileSystem mockFileSystem;
private KerberosProperties kerberosProperties;
@BeforeEach
public void setup() throws Exception {
- mockNiFiProperties = mock(NiFiProperties.class);
+ NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
mockFileSystem = mock(FileSystem.class);
@@ -114,6 +114,22 @@ public class TestDeleteHDFS {
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
}
+ @Test
+ public void testGSSException() throws Exception {
+ Path filePath = new Path("/some/path/to/file.txt");
+ when(mockFileSystem.exists(any(Path.class))).thenThrow(new
IOException(new GSSException(13)));
+ DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties,
mockFileSystem);
+ TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+ runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
+ Map<String, String> attributes = Maps.newHashMap();
+ attributes.put("hdfs.file", filePath.toString());
+ runner.enqueue("foo", attributes);
+ runner.run();
+ // GSS Auth exceptions should cause rollback
+ runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+ runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
+ }
+
@Test
public void testPermissionIOException() throws Exception {
Path filePath = new Path("/some/path/to/file.txt");
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 43f0c3ce6d..036904312c 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -16,8 +16,10 @@
*/
package org.apache.nifi.processors.hadoop;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.processors.hadoop.util.MockFileSystem;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
@@ -44,17 +46,15 @@ import static org.mockito.Mockito.when;
public class TestFetchHDFS {
private TestRunner runner;
- private TestableFetchHDFS proc;
- private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
@BeforeEach
public void setup() {
- mockNiFiProperties = mock(NiFiProperties.class);
+ NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
- proc = new TestableFetchHDFS(kerberosProperties);
+ TestableFetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
runner = TestRunners.newTestRunner(proc);
}
@@ -63,7 +63,7 @@ public class TestFetchHDFS {
final String file = "src/test/resources/testdata/randombytes-1";
final String fileWithMultipliedSeparators =
"src/test////resources//testdata/randombytes-1";
runner.setProperty(FetchHDFS.FILENAME, fileWithMultipliedSeparators);
- runner.enqueue(new String("trigger flow file"));
+ runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
final List<ProvenanceEventRecord> provenanceEvents =
runner.getProvenanceEvents();
@@ -83,7 +83,7 @@ public class TestFetchHDFS {
final String file = destination.getAbsolutePath();
final String fileWithMultipliedSeparators = "/" + file;
runner.setProperty(FetchHDFS.FILENAME, fileWithMultipliedSeparators);
- runner.enqueue(new String("trigger flow file"));
+ runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
final List<ProvenanceEventRecord> provenanceEvents =
runner.getProvenanceEvents();
@@ -98,7 +98,7 @@ public class TestFetchHDFS {
public void testFetchStaticFileThatDoesNotExist() {
final String file = "src/test/resources/testdata/doesnotexist";
runner.setProperty(FetchHDFS.FILENAME, file);
- runner.enqueue(new String("trigger flow file"));
+ runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
}
@@ -111,7 +111,7 @@ public class TestFetchHDFS {
final Map<String,String> attributes = new HashMap<>();
attributes.put("my.file", file);
- runner.enqueue(new String("trigger flow file"), attributes);
+ runner.enqueue("trigger flow file", attributes);
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
}
@@ -120,7 +120,7 @@ public class TestFetchHDFS {
public void testFilenameWithValidEL() {
final String file =
"src/test/resources/testdata/${literal('randombytes-1')}";
runner.setProperty(FetchHDFS.FILENAME, file);
- runner.enqueue(new String("trigger flow file"));
+ runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
}
@@ -136,7 +136,7 @@ public class TestFetchHDFS {
public void testFilenameWithUnrecognizedEL() {
final String file = "data_${literal('testing'):substring(0,4)%7D";
runner.setProperty(FetchHDFS.FILENAME, file);
- runner.enqueue(new String("trigger flow file"));
+ runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
}
@@ -147,14 +147,14 @@ public class TestFetchHDFS {
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchHDFS.FILENAME,
"src/test/resources/testdata/randombytes-1.gz");
runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
- runner.enqueue(new String("trigger flow file"));
+ runner.enqueue("trigger flow file");
runner.run();
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
-
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
+ assertEquals("randombytes-1",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
InputStream expected =
getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected);
}
@@ -165,14 +165,14 @@ public class TestFetchHDFS {
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchHDFS.FILENAME,
"src/test/resources/testdata/randombytes-1.gz");
runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
- runner.enqueue(new String("trigger flow file"));
+ runner.enqueue("trigger flow file");
runner.run();
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
-
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
+ assertEquals("randombytes-1.gz",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
InputStream expected =
getClass().getResourceAsStream("/testdata/randombytes-1.gz");
flowFile.assertContentEquals(expected);
}
@@ -183,28 +183,58 @@ public class TestFetchHDFS {
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchHDFS.FILENAME,
"src/test/resources/testdata/13545423550275052.zip");
runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
- runner.enqueue(new String("trigger flow file"));
+ runner.enqueue("trigger flow file");
runner.run();
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
-
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+ assertEquals("13545423550275052.zip",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
InputStream expected =
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
}
+ @Test
+ public void testGSSException() throws IOException {
+ MockFileSystem fileSystem = new MockFileSystem();
+ fileSystem.setFailOnOpen(true);
+ FetchHDFS proc = new TestableFetchHDFS(kerberosProperties, fileSystem);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchHDFS.FILENAME,
"src/test/resources/testdata/randombytes-1.gz");
+ runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
+ runner.enqueue("trigger flow file");
+ runner.run();
+
+ runner.assertTransferCount(FetchHDFS.REL_SUCCESS, 0);
+ runner.assertTransferCount(FetchHDFS.REL_FAILURE, 0);
+ runner.assertTransferCount(FetchHDFS.REL_COMMS_FAILURE, 0);
+ // assert that no files were penalized
+ runner.assertPenalizeCount(0);
+ fileSystem.setFailOnOpen(false);
+ }
+
private static class TestableFetchHDFS extends FetchHDFS {
private final KerberosProperties testKerberosProps;
+ private final FileSystem fileSystem;
public TestableFetchHDFS(KerberosProperties testKerberosProps) {
this.testKerberosProps = testKerberosProps;
+ this.fileSystem = null;
+ }
+ public TestableFetchHDFS(KerberosProperties testKerberosProps, final
FileSystem fileSystem) {
+ this.testKerberosProps = testKerberosProps;
+ this.fileSystem = fileSystem;
}
@Override
protected KerberosProperties getKerberosProperties(File
kerberosConfigFile) {
return testKerberosProps;
}
+
+ @Override
+ protected FileSystem getFileSystem() {
+ return fileSystem == null ? super.getFileSystem() : fileSystem;
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java
index 0235f73940..2b23e02939 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java
@@ -17,17 +17,23 @@
package org.apache.nifi.processors.hadoop;
import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.hadoop.util.MockFileSystem;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
@@ -36,21 +42,6 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -257,6 +248,28 @@ public class TestGetHDFSFileInfo {
mff.assertAttributeEquals("hdfs.status", "Failed due to:
java.io.InterruptedIOException");
}
+ @Test
+ public void testWithGSSException() {
+ proc.fileSystem.setFailOnExists(true);
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir");
+ runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "false");
+ runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true");
+ runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true");
+ runner.setProperty(GetHDFSFileInfo.DESTINATION,
GetHDFSFileInfo.DESTINATION_CONTENT);
+
+ runner.run();
+
+ // Assert session rollback
+ runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0);
+ runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 0);
+ runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0);
+ runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0);
+
+ proc.fileSystem.setFailOnExists(false);
+ }
+
@Test
public void testRunWithPermissionsExceptionAttributes() throws
InterruptedException {
@@ -789,135 +802,4 @@ public class TestGetHDFSFileInfo {
return fileSystem;
}
}
-
- private class MockFileSystem extends FileSystem {
- private final Map<Path, Set<FileStatus>> fileStatuses = new
HashMap<>();
- private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
-
- public void addFileStatus(final FileStatus parent, final FileStatus
child) {
- Set<FileStatus> children = fileStatuses.get(parent.getPath());
- if (children == null) {
- children = new HashSet<>();
- fileStatuses.put(parent.getPath(), children);
- }
- if (child != null) {
- children.add(child);
- if (child.isDirectory() &&
!fileStatuses.containsKey(child.getPath())) {
- fileStatuses.put(child.getPath(), new
HashSet<FileStatus>());
- }
- }
-
- pathToStatus.put(parent.getPath(), parent);
- pathToStatus.put(child.getPath(), child);
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public long getDefaultBlockSize() {
- return 1024L;
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public short getDefaultReplication() {
- return 1;
- }
-
- @Override
- public URI getUri() {
- return null;
- }
-
- @Override
- public FSDataInputStream open(final Path f, final int bufferSize)
throws IOException {
- return null;
- }
-
- @Override
- public FSDataOutputStream create(final Path f, final FsPermission
permission, final boolean overwrite, final int bufferSize, final short
replication,
- final long blockSize, final
Progressable progress) throws IOException {
- return null;
- }
-
- @Override
- public FSDataOutputStream append(final Path f, final int bufferSize,
final Progressable progress) throws IOException {
- return null;
- }
-
- @Override
- public boolean rename(final Path src, final Path dst) throws
IOException {
- return false;
- }
-
- @Override
- public boolean delete(final Path f, final boolean recursive) throws
IOException {
- return false;
- }
-
- @Override
- public FileStatus[] listStatus(final Path f) throws
FileNotFoundException, IOException {
- if (!fileStatuses.containsKey(f)) {
- throw new FileNotFoundException();
- }
- if (f.getName().startsWith("list_exception_")) {
- String clzName =
f.getName().substring("list_exception_".length(), f.getName().length());
- IOException exception = null;
- try {
- exception =
(IOException)Class.forName(clzName).newInstance();
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- throw exception;
- }
- final Set<FileStatus> statuses = fileStatuses.get(f);
- if (statuses == null) {
- return new FileStatus[0];
- }
-
- for (FileStatus s : statuses) {
- getFileStatus(s.getPath()); //support exception handling only.
- }
-
- return statuses.toArray(new FileStatus[statuses.size()]);
- }
-
- @Override
- public void setWorkingDirectory(final Path new_dir) {
-
- }
-
- @Override
- public Path getWorkingDirectory() {
- return new Path(new File(".").getAbsolutePath());
- }
-
- @Override
- public boolean mkdirs(final Path f, final FsPermission permission)
throws IOException {
- return false;
- }
-
- @Override
- public FileStatus getFileStatus(final Path f) throws IOException {
- if (f!=null && f.getName().startsWith("exception_")) {
- String clzName = f.getName().substring("exception_".length(),
f.getName().length());
- IOException exception = null;
- try {
- exception =
(IOException)Class.forName(clzName).newInstance();
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- throw exception;
- }
- final FileStatus fileStatus = pathToStatus.get(f);
- if (fileStatus == null) throw new FileNotFoundException();
- return fileStatus;
- }
-
- public FileStatus newFile(String p) {
- return new FileStatus(100L, false, 3, 128*1024*1024,
1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new
Path(p));
- }
- public FileStatus newDir(String p) {
- return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L,
1523457000000L, perms((short)0755), "owner", "group", new Path(p));
- }
- }
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
new file mode 100644
index 0000000000..3a3477502e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.Progressable;
+import org.ietf.jgss.GSSException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MockFileSystem extends FileSystem {
+ private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
+ private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
+ private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
+
+ private boolean failOnOpen;
+ private boolean failOnClose;
+ private boolean failOnCreate;
+ private boolean failOnFileStatus;
+ private boolean failOnExists;
+
+
+ public void setFailOnClose(final boolean failOnClose) {
+ this.failOnClose = failOnClose;
+ }
+
+ public void setFailOnCreate(final boolean failOnCreate) {
+ this.failOnCreate = failOnCreate;
+ }
+
+ public void setFailOnFileStatus(final boolean failOnFileStatus) {
+ this.failOnFileStatus = failOnFileStatus;
+ }
+
+ public void setFailOnExists(final boolean failOnExists) {
+ this.failOnExists = failOnExists;
+ }
+
+ public void setFailOnOpen(final boolean failOnOpen) {
+ this.failOnOpen = failOnOpen;
+ }
+
+ public void setAcl(final Path path, final List<AclEntry> aclSpec) {
+ pathToAcl.put(path, aclSpec);
+ }
+
+ @Override
+ public AclStatus getAclStatus(final Path path) {
+ return new AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path,
new ArrayList<>())).build();
+ }
+
+ @Override
+ public URI getUri() {
+ return URI.create("file:///");
+ }
+
+ @Override
+ public FSDataInputStream open(final Path f, final int bufferSize) throws
IOException {
+ if (failOnOpen) {
+ throw new IOException(new GSSException(13));
+ }
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream create(final Path f, final FsPermission
permission, final boolean overwrite, final int bufferSize, final short
replication,
+ final long blockSize, final Progressable
progress) throws IOException {
+ if (failOnCreate) {
+ // Simulate an AuthenticationException wrapped in an IOException
+ throw new IOException(new AuthenticationException("test auth
error"));
+ }
+ pathToStatus.put(f, newFile(f, permission));
+ if(failOnClose) {
+ return new FSDataOutputStream(new ByteArrayOutputStream(), new
FileSystem.Statistics("")) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ throw new IOException("Fail on close");
+ }
+ };
+ } else {
+ return new FSDataOutputStream(new ByteArrayOutputStream(), new
Statistics(""));
+ }
+ }
+
+ @Override
+ public FSDataOutputStream append(final Path f, final int bufferSize, final
Progressable progress) {
+ return null;
+ }
+
+ @Override
+ public boolean rename(final Path src, final Path dst) {
+ if (pathToStatus.containsKey(src)) {
+ pathToStatus.put(dst, pathToStatus.remove(src));
+ } else {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean delete(final Path f, final boolean recursive) {
+ if (pathToStatus.containsKey(f)) {
+ pathToStatus.remove(f);
+ } else {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setWorkingDirectory(final Path new_dir) {
+
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path(new File(".").getAbsolutePath());
+ }
+
+ @Override
+ public boolean mkdirs(final Path f, final FsPermission permission) {
+ return false;
+ }
+
+ @Override
+ public boolean mkdirs(Path f) {
+ pathToStatus.put(f, newDir(f));
+ return true;
+ }
+
+ @Override
+ public FileStatus getFileStatus(final Path path) throws IOException {
+ if (failOnFileStatus) {
+ throw new IOException(new GSSException(13));
+ }
+ if (path != null && path.getName().startsWith("exception_")) {
+ final String className =
path.getName().substring("exception_".length());
+ final IOException exception;
+ try {
+ exception = (IOException)
Class.forName(className).getDeclaredConstructor().newInstance();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ throw exception;
+ }
+
+ final FileStatus fileStatus = pathToStatus.get(path);
+ if (fileStatus == null) {
+ throw new FileNotFoundException();
+ }
+ return fileStatus;
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ if (failOnExists) {
+ throw new IOException(new GSSException(13));
+ }
+ return pathToStatus.containsKey(f);
+ }
+
+ public FileStatus newFile(Path p, FsPermission permission) {
+ return new FileStatus(100L, false, 3, 128 * 1024 * 1024,
1523456000000L, 1523457000000L, permission, "owner", "group", p);
+ }
+
+ public FileStatus newDir(Path p) {
+ return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L,
1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true,
false, false);
+ }
+
+ public FileStatus newFile(String p) {
+ return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L,
1523457000000L, perms((short)0644), "owner", "group", new Path(p));
+ }
+ public FileStatus newDir(String p) {
+ return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L,
1523457000000L, perms((short)0755), "owner", "group", new Path(p));
+ }
+
+ @Override
+ public long getDefaultBlockSize(Path f) {
+ return 33554432L;
+ }
+
+ public void addFileStatus(final FileStatus parent, final FileStatus child)
{
+ Set<FileStatus> children =
fileStatuses.computeIfAbsent(parent.getPath(), k -> new HashSet<>());
+ if (child != null) {
+ children.add(child);
+ if (child.isDirectory() &&
!fileStatuses.containsKey(child.getPath())) {
+ fileStatuses.put(child.getPath(), new HashSet<>());
+ }
+ }
+
+ pathToStatus.put(parent.getPath(), parent);
+ pathToStatus.put(child.getPath(), child);
+ }
+
+ @Override
+ public FileStatus[] listStatus(final Path f) throws IOException {
+ if (!fileStatuses.containsKey(f)) {
+ throw new FileNotFoundException();
+ }
+
+ if (f.getName().startsWith("list_exception_")) {
+ final String className =
f.getName().substring("list_exception_".length());
+ final IOException exception;
+ try {
+ exception = (IOException)
Class.forName(className).getDeclaredConstructor().newInstance();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ throw exception;
+ }
+
+ final Set<FileStatus> statuses = fileStatuses.get(f);
+ if (statuses == null) {
+ return new FileStatus[0];
+ }
+
+ for (FileStatus s : statuses) {
+ getFileStatus(s.getPath()); //support exception handling only.
+ }
+
+ return statuses.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public long getDefaultBlockSize() {
+ return 1024L;
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public short getDefaultReplication() {
+ return 1;
+ }
+
+
+ private static FsPermission perms(short p) {
+ return new FsPermission(p);
+ }
+}
\ No newline at end of file