This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new d1579b4d21 NIFI-13121: Handle runtime exceptions in FetchHDFS
d1579b4d21 is described below
commit d1579b4d21335c1e200de079cde151f670764c01
Author: Matt Burgess <[email protected]>
AuthorDate: Wed May 1 16:52:57 2024 -0400
NIFI-13121: Handle runtime exceptions in FetchHDFS
Signed-off-by: Joe Gresock <[email protected]>
This closes #8727.
---
.../apache/nifi/processors/hadoop/FetchHDFS.java | 7 +++-
.../nifi/processors/hadoop/TestFetchHDFS.java | 19 +++++++++
.../processors/hadoop/util/MockFileSystem.java | 48 +++++++++++++++++++++-
3 files changed, 72 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index e026c01862..ce551acb75 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -43,6 +43,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
@@ -175,7 +176,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
outgoingFlowFile = session.putAttribute(outgoingFlowFile,
CoreAttributes.FILENAME.key(), outputFilename);
stopWatch.stop();
- getLogger().info("Successfully received content from {} for {}
in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()});
+ getLogger().info("Successfully received content from {} for {}
in {}", qualifiedPath, outgoingFlowFile, stopWatch.getDuration());
outgoingFlowFile = session.putAttribute(outgoingFlowFile,
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(outgoingFlowFile,
qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(outgoingFlowFile, getSuccessRelationship());
@@ -190,6 +191,10 @@ public class FetchHDFS extends AbstractHadoopProcessor {
outgoingFlowFile = session.penalize(outgoingFlowFile);
session.transfer(outgoingFlowFile,
getCommsFailureRelationship());
}
+ } catch (FlowFileAccessException ffae) {
+ getLogger().error("Failed to retrieve S3 Object for {};
routing to failure", outgoingFlowFile, ffae);
+ outgoingFlowFile = session.penalize(outgoingFlowFile);
+ session.transfer(outgoingFlowFile,
getCommsFailureRelationship());
} finally {
IOUtils.closeQuietly(stream);
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 036904312c..9845c61e28 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -214,6 +214,25 @@ public class TestFetchHDFS {
fileSystem.setFailOnOpen(false);
}
+ @Test
+ public void testRuntimeException() {
+ MockFileSystem fileSystem = new MockFileSystem();
+ fileSystem.setRuntimeFailOnOpen(true);
+ FetchHDFS proc = new TestableFetchHDFS(kerberosProperties, fileSystem);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchHDFS.FILENAME,
"src/test/resources/testdata/randombytes-1.gz");
+ runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
+ runner.enqueue("trigger flow file");
+ runner.run();
+
+ runner.assertTransferCount(FetchHDFS.REL_SUCCESS, 0);
+ runner.assertTransferCount(FetchHDFS.REL_FAILURE, 0);
+ runner.assertTransferCount(FetchHDFS.REL_COMMS_FAILURE, 1);
+ // assert that the file was penalized
+ runner.assertPenalizeCount(1);
+ fileSystem.setRuntimeFailOnOpen(false);
+ }
+
private static class TestableFetchHDFS extends FetchHDFS {
private final KerberosProperties testKerberosProps;
private final FileSystem fileSystem;
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
index f020f8b3b5..d5c196f0ee 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -26,6 +27,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import
org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Progressable;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.ietf.jgss.GSSException;
import java.io.ByteArrayOutputStream;
@@ -49,6 +51,7 @@ public class MockFileSystem extends FileSystem {
private final Map<Path, FSDataOutputStream> pathToOutputStream = new
HashMap<>();
private boolean failOnOpen;
+ private boolean runtimeFailOnOpen;
private boolean failOnClose;
private boolean failOnCreate;
private boolean failOnFileStatus;
@@ -74,6 +77,10 @@ public class MockFileSystem extends FileSystem {
this.failOnOpen = failOnOpen;
}
+ public void setRuntimeFailOnOpen(final boolean runtimeFailOnOpen) {
+ this.runtimeFailOnOpen = runtimeFailOnOpen;
+ }
+
public void setAcl(final Path path, final List<AclEntry> aclSpec) {
pathToAcl.put(path, aclSpec);
}
@@ -93,7 +100,10 @@ public class MockFileSystem extends FileSystem {
if (failOnOpen) {
throw new IOException(new GSSException(13));
}
- return null;
+ if (runtimeFailOnOpen) {
+ throw new FlowFileAccessException("runtime");
+ }
+ return createInputStream(f);
}
@Override
@@ -190,6 +200,19 @@ public class MockFileSystem extends FileSystem {
return pathToStatus.containsKey(f);
}
+ private FSDataInputStream createInputStream(final Path f) throws
IOException {
+ if(failOnClose) {
+ return new FSDataInputStream(new StubFSInputStream()) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ throw new IOException("Fail on close");
+ }
+ };
+ } else {
+ return new FSDataInputStream(new StubFSInputStream());
+ }
+ }
private FSDataOutputStream createOutputStream() {
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new
Statistics("")) {
@@ -294,4 +317,27 @@ public class MockFileSystem extends FileSystem {
private static FsPermission perms(short p) {
return new FsPermission(p);
}
+
+ private class StubFSInputStream extends FSInputStream {
+
+ @Override
+ public void seek(long l) throws IOException {
+
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return true;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ }
}
\ No newline at end of file