This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 04711ab NIFI-7477 Optionally adding validation details as a new attribute of the flowfile 04711ab is described below commit 04711ab466a2b2c8c3b618f7617fc243c6576156 Author: jahenaor <jairohenaoro...@gmail.com> AuthorDate: Wed May 27 22:55:25 2020 -0500 NIFI-7477 Optionally adding validation details as a new attribute of the flowfile NIFI-7477 Improving description and unit test now verifies attribute content NIFI-7477: Fixed checkstyle errors Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #4301 --- .../nifi/processors/standard/ValidateRecord.java | 46 ++++++++++++++++++++-- .../processors/standard/TestValidateRecord.java | 46 ++++++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java index b3255e6..1210eec 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -180,6 +180,26 @@ public class ValidateRecord extends AbstractProcessor { .defaultValue("true") .required(true) .build(); + static final PropertyDescriptor VALIDATION_DETAILS_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("validation-details-attribute-name") + .displayName("Validation Details Attribute Name") + .description("If specified, when a validation error occurs, this attribute name will be used to leave the details. The number of characters will be limited " + + "by the property 'Maximum Validation Details Length'.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .defaultValue(null) + .build(); + static final PropertyDescriptor MAX_VALIDATION_DETAILS_LENGTH = new PropertyDescriptor.Builder() + .name("maximum-validation-details-length") + .displayName("Maximum Validation Details Length") + .description("Specifies the maximum number of characters that validation details value can have. Any characters beyond the max will be truncated. " + + "This property is only used if 'Validation Details Attribute Name' is set") + .required(false) + .defaultValue("1024") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); static final Relationship REL_VALID = new Relationship.Builder() .name("valid") @@ -207,6 +227,8 @@ public class ValidateRecord extends AbstractProcessor { properties.add(SCHEMA_TEXT); properties.add(ALLOW_EXTRA_FIELDS); properties.add(STRICT_TYPE_CHECKING); + properties.add(VALIDATION_DETAILS_ATTRIBUTE_NAME); + properties.add(MAX_VALIDATION_DETAILS_LENGTH); return properties; } @@ -350,7 +372,7 @@ public class ValidateRecord extends AbstractProcessor { } if (validWriter != null) { - completeFlowFile(session, validFlowFile, validWriter, REL_VALID, null); + completeFlowFile(context, session, validFlowFile, validWriter, REL_VALID, null); } if (invalidWriter != null) { @@ -389,7 +411,7 @@ public class ValidateRecord extends AbstractProcessor { } final String validationErrorString = errorBuilder.toString(); - completeFlowFile(session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString); + completeFlowFile(context, session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString); } } finally { closeQuietly(validWriter); @@ -424,14 +446,32 @@ public class ValidateRecord extends AbstractProcessor { } } - private void completeFlowFile(final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer, final Relationship relationship, final String details) throws IOException { + private void completeFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer, + final Relationship relationship, final String details) throws IOException { final WriteResult writeResult = writer.finishRecordSet(); writer.close(); + final String validationDetailsAttributeName = context.getProperty(VALIDATION_DETAILS_ATTRIBUTE_NAME) + .evaluateAttributeExpressions(flowFile).getValue(); + + final Integer maxValidationDetailsLength = context.getProperty(MAX_VALIDATION_DETAILS_LENGTH).evaluateAttributeExpressions(flowFile).asInteger(); + final Map<String, String> attributes = new HashMap<>(); attributes.putAll(writeResult.getAttributes()); attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + + if(validationDetailsAttributeName != null && details != null && !details.isEmpty()) { + String truncatedDetails = details; + + //Truncating only when it exceeds the configured maximum + if (truncatedDetails.length() > maxValidationDetailsLength) { + truncatedDetails = truncatedDetails.substring(0, maxValidationDetailsLength); + } + + attributes.put(validationDetailsAttributeName, truncatedDetails); + } + session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, relationship); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java index 74c3a17..895e3de 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java @@ -594,4 +594,50 @@ public class TestValidateRecord { } } + @Test + public void testValidationsDetailsAttributeForInvalidRecords() throws InitializationException, UnsupportedEncodingException, IOException { + final String schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc")), "UTF-8"); + + final CSVReader csvReader = new CSVReader(); + runner.addControllerService("reader", csvReader); + runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_TEXT, schema); + runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "false"); + runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue()); + runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false"); + runner.enableControllerService(csvReader); + + final MockRecordWriter validWriter = new MockRecordWriter("valid", false); + runner.addControllerService("writer", validWriter); + runner.enableControllerService(validWriter); + + final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true); + runner.addControllerService("invalid-writer", invalidWriter); + runner.enableControllerService(invalidWriter); + + runner.setProperty(ValidateRecord.RECORD_READER, "reader"); + runner.setProperty(ValidateRecord.RECORD_WRITER, "writer"); + runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer"); + runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false"); + runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150"); + runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails"); + + final String content = "1, John Doe\n" + + "2, Jane Doe\n" + + "Three, Jack Doe\n"; + + runner.enqueue(content); + runner.run(); + + runner.assertTransferCount(ValidateRecord.REL_INVALID, 1); + runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0); + + final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0); + invalidFlowFile.assertAttributeEquals("record.count", "1"); + invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n"); + invalidFlowFile.assertAttributeExists("valDetails"); + invalidFlowFile.assertAttributeEquals("valDetails", "Records in this FlowFile were invalid for the following reasons: ; " + + "The following 1 fields had values whose type did not match the schema: [/id]"); + } + }