This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb8ce7 NIFI-7139 Add record.error.message on failure of a record
reader or writer
0bb8ce7 is described below
commit 0bb8ce7438d9855dcca6bf89e3a672d1f9477593
Author: Shawn Weeks <[email protected]>
AuthorDate: Thu Feb 13 08:39:49 2020 -0600
NIFI-7139 Add record.error.message on failure of a record reader or writer
Handle scenario where message might be null.
Update to test case that was failing because adding attributes modified a
flow file even if you don't change the contents.
Fixed Style Issues and Updated WritesAttributes.
Added Test Case for Error Message
Signed-off-by: Matthew Burgess <[email protected]>
This closes #4052
---
.../nifi/processors/standard/AbstractRecordProcessor.java | 9 +++++++++
.../org/apache/nifi/processors/standard/ConvertRecord.java | 3 ++-
.../org/apache/nifi/processors/standard/UpdateRecord.java | 6 +++++-
.../apache/nifi/processors/standard/TestConvertRecord.java | 11 ++++++-----
4 files changed, 22 insertions(+), 7 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 8ccea5a..1ea70e2 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -174,6 +174,15 @@ public abstract class AbstractRecordProcessor extends
AbstractProcessor {
});
} catch (final Exception e) {
getLogger().error("Failed to process {}; will route to failure",
new Object[] {flowFile, e});
+ // Since we are wrapping the exceptions above there should always
be a cause
+ // but it's possible it might not have a message. This handles
that by logging
+ // the name of the class thrown.
+ Throwable c = e.getCause();
+ if (c != null) {
+ session.putAttribute(flowFile, "record.error.message",
(c.getLocalizedMessage() != null) ? c.getLocalizedMessage() :
c.getClass().getCanonicalName() + " Thrown");
+ } else {
+ session.putAttribute(flowFile, "record.error.message",
e.getClass().getCanonicalName() + " Thrown");
+ }
session.transfer(flowFile, REL_FAILURE);
return;
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
index 1be1794..a1e6f99 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -41,7 +41,8 @@ import java.util.List;
@Tags({"convert", "record", "generic", "schema", "json", "csv", "avro", "log",
"logs", "freeform", "text"})
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the
mime.type attribute to the MIME Type specified by the Record Writer"),
- @WritesAttribute(attribute = "record.count", description = "The number of
records in the FlowFile")
+ @WritesAttribute(attribute = "record.count", description = "The number of
records in the FlowFile"),
+ @WritesAttribute(attribute = "record.error.message", description = "This
attribute provides on failure the error message encountered by the Reader or
Writer.")
})
@CapabilityDescription("Converts records from one data format to another using
configured Record Reader and Record Write Controller Services. "
+ "The Reader and Writer must be configured with \"matching\" schemas. By
this, we mean the schemas must have the same field names. The types of the
fields "
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index 8ee5f43..65abdc9 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
@@ -23,6 +23,7 @@ import
org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@@ -67,7 +68,10 @@ import java.util.stream.Stream;
+ "This Processor requires that at least one user-defined Property be
added. The name of the Property should indicate a RecordPath that determines
the field that should "
+ "be updated. The value of the Property is either a replacement value
(optionally making use of the Expression Language) or is itself a RecordPath
that extracts a value from "
+ "the Record. Whether the Property value is determined to be a RecordPath
or a literal value depends on the configuration of the <Replacement Value
Strategy> Property.")
-@WritesAttribute(attribute = "record.index", description = "This attribute
provides the current row index and is only available inside the literal value
expression.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "record.index", description = "This attribute
provides the current row index and is only available inside the literal value
expression."),
+ @WritesAttribute(attribute = "record.error.message", description = "This
attribute provides on failure the error message encountered by the Reader or
Writer.")
+})
@SeeAlso({ConvertRecord.class})
public class UpdateRecord extends AbstractRecordProcessor {
private static final String FIELD_NAME = "field.name";
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 822f664..7870a04 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -18,7 +18,6 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -120,7 +119,7 @@ public class TestConvertRecord {
}
@Test
- public void testReadFailure() throws InitializationException {
+ public void testReadFailure() throws InitializationException, IOException {
final MockRecordParser readerService = new MockRecordParser(2);
final MockRecordWriter writerService = new MockRecordWriter("header",
false);
@@ -146,12 +145,13 @@ public class TestConvertRecord {
// Original FlowFile should be routed to 'failure' relationship
without modification
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
final MockFlowFile out =
runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
- assertTrue(original == out);
+ out.assertContentEquals(original.toByteArray());
+ out.assertAttributeEquals("record.error.message","Intentional Unit
Test Exception because 2 records have been read");
}
@Test
- public void testWriteFailure() throws InitializationException {
+ public void testWriteFailure() throws InitializationException, IOException
{
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header",
false, 2);
@@ -177,7 +177,8 @@ public class TestConvertRecord {
// Original FlowFile should be routed to 'failure' relationship
without modification
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
final MockFlowFile out =
runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
- assertTrue(original == out);
+ out.assertContentEquals(original.toByteArray());
+ out.assertAttributeEquals("record.error.message","Unit Test
intentionally throwing IOException after 2 records were written");
}
@Test