Repository: nifi Updated Branches: refs/heads/master c51512f5e -> 0efddf47d
NIFI-4579: Fix ValidateRecord type coercing Signed-off-by: Matthew Burgess <[email protected]> This closes #2794 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0efddf47 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0efddf47 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0efddf47 Branch: refs/heads/master Commit: 0efddf47d516b62d7d9c61142d20ce40bcec675f Parents: c51512f Author: Koji Kawamura <[email protected]> Authored: Thu Jun 14 15:39:17 2018 +0900 Committer: Matthew Burgess <[email protected]> Committed: Thu Dec 13 16:10:41 2018 -0500 ---------------------------------------------------------------------- .../processors/standard/ValidateRecord.java | 18 +- .../processors/standard/TestValidateRecord.java | 223 +++++++++++++++++++ 2 files changed, 234 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0efddf47/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java index 52f462a..bc39ecf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -109,7 +109,9 @@ public class ValidateRecord extends AbstractProcessor { static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() .name("record-writer") .displayName("Record Writer") - .description("Specifies the Controller Service to use for writing out the records") + .description("Specifies the Controller Service to use for writing out the records. " + + "Regardless of the Controller Service schema access configuration, " + + "the schema that is used to validate record is used to write the valid results.") .identifiesControllerService(RecordSetWriterFactory.class) .required(true) .build(); @@ -117,7 +119,8 @@ public class ValidateRecord extends AbstractProcessor { .name("invalid-record-writer") .displayName("Record Writer for Invalid Records") .description("If specified, this Controller Service will be used to write out any records that are invalid. " - + "If not specified, the writer specified by the \"Record Writer\" property will be used. This is useful, for example, when the configured " + + "If not specified, the writer specified by the \"Record Writer\" property will be used with the schema used to read the input records. " + + "This is useful, for example, when the configured " + "Record Writer cannot write data that does not adhere to its schema (as is the case with Avro) or when it is desirable to keep invalid records " + "in their original format while converting valid records to another format.") .identifiesControllerService(RecordSetWriterFactory.class) @@ -161,7 +164,7 @@ public class ValidateRecord extends AbstractProcessor { .displayName("Allow Extra Fields") .description("If the incoming data has fields that are not present in the schema, this property determines whether or not the Record is valid. " + "If true, the Record is still valid. If false, the Record will be invalid due to the extra fields.") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) @@ -172,7 +175,7 @@ public class ValidateRecord extends AbstractProcessor { .description("If the incoming data has a Record where a field is not of the correct type, this property determine whether how to handle the Record. " + "If true, the Record will still be considered invalid. If false, the Record will be considered valid and the field will be coerced into the " + "correct type (if possible, according to the type coercion supported by the Record Writer).") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) @@ -292,7 +295,8 @@ public class ValidateRecord extends AbstractProcessor { validFlowFile = session.create(flowFile); } - validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, record.getSchema()); + validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema); + } else { invalidCount++; logValidationErrors(flowFile, recordCount, result); @@ -435,13 +439,13 @@ public class ValidateRecord extends AbstractProcessor { } private RecordSetWriter createIfNecessary(final RecordSetWriter writer, final RecordSetWriterFactory factory, final ProcessSession session, - final FlowFile flowFile, final RecordSchema inputSchema) throws SchemaNotFoundException, IOException { + final FlowFile flowFile, final RecordSchema outputSchema) throws SchemaNotFoundException, IOException { if (writer != null) { return writer; } final OutputStream out = session.write(flowFile); - final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, out); + final RecordSetWriter created = factory.createWriter(getLogger(), outputSchema, out); created.beginRecordSet(); return created; } http://git-wip-us.apache.org/repos/asf/nifi/blob/0efddf47/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java index 728875b..fdb5be4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java @@ -17,23 +17,38 @@ package org.apache.nifi.processors.standard; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Optional; +import org.apache.nifi.avro.AvroReader; +import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.csv.CSVReader; import org.apache.nifi.csv.CSVRecordSetWriter; import org.apache.nifi.csv.CSVUtils; +import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestValidateRecord { private TestRunner runner; @@ -137,4 +152,212 @@ public class TestValidateRecord { invalidFlowFile.assertAttributeEquals("record.count", "1"); invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n"); } + + @Test + public void testStrictTypeCheck() throws InitializationException, IOException { + final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8"); + + final CSVReader csvReader = new CSVReader(); + runner.addControllerService("reader", csvReader); + runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived"); + runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true"); + runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true"); + runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue()); + runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false"); + runner.enableControllerService(csvReader); + + final JsonRecordSetWriter validWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", validWriter); + runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(validWriter); + + final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true); + runner.addControllerService("invalid-writer", invalidWriter); + runner.enableControllerService(invalidWriter); + + runner.setProperty(ValidateRecord.RECORD_READER, "reader"); + runner.setProperty(ValidateRecord.RECORD_WRITER, "writer"); + runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema); + runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer"); + runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false"); + runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true"); + + // The validationSchema expects 'id' to be int, but CSVReader reads it as 'string' + // with strict type check, the type difference is not allowed. + final String content = "id, firstName, lastName\n" + + "1, John, Doe\n" + + "2, Jane, Doe\n" + + "Three, Jack, Doe\n"; + + runner.enqueue(content); + runner.run(); + + runner.assertTransferCount(ValidateRecord.REL_VALID, 0); + runner.assertTransferCount(ValidateRecord.REL_INVALID, 1); + runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0); + + final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0); + invalidFlowFile.assertAttributeEquals("record.count", "3"); + final String expectedInvalidContents = "invalid\n" + + "\"1\",\"John\",\"Doe\"\n" + + "\"2\",\"Jane\",\"Doe\"\n" + + "\"Three\",\"Jack\",\"Doe\"\n"; + invalidFlowFile.assertContentEquals(expectedInvalidContents); + } + + @Test + public void testNonStrictTypeCheckWithAvroWriter() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { + final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8"); + + final CSVReader csvReader = new CSVReader(); + runner.addControllerService("reader", csvReader); + runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived"); + runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true"); + runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true"); + runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue()); + runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false"); + runner.enableControllerService(csvReader); + + final AvroRecordSetWriter validWriter = new AvroRecordSetWriter(); + runner.addControllerService("writer", validWriter); + runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(validWriter); + + final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true); + runner.addControllerService("invalid-writer", invalidWriter); + runner.enableControllerService(invalidWriter); + + runner.setProperty(ValidateRecord.RECORD_READER, "reader"); + runner.setProperty(ValidateRecord.RECORD_WRITER, "writer"); + runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema); + runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer"); + runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false"); + runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false"); + + // The validationSchema expects 'id' to be int, but CSVReader reads it as 'string' + // with non-strict type check, the type difference should be accepted, and results should be written as 'int'. + final String content = "id, firstName, lastName\n" + + "1, John, Doe\n" + + "2, Jane, Doe\n" + + "Three, Jack, Doe\n"; + + runner.enqueue(content); + runner.run(); + + runner.assertTransferCount(ValidateRecord.REL_VALID, 1); + runner.assertTransferCount(ValidateRecord.REL_INVALID, 1); + runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0); + + final AvroReader avroReader = new AvroReader(); + runner.addControllerService("avroReader", avroReader); + runner.setProperty(avroReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.enableControllerService(avroReader); + final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0); + try ( + final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFile.toByteArray()); + final RecordReader recordReader = avroReader.createRecordReader(validFlowFile.getAttributes(), resultContentStream, runner.getLogger()); + ) { + final RecordSchema resultSchema = recordReader.getSchema(); + assertEquals(3, resultSchema.getFieldCount()); + + // The id field should be an int field. + final Optional<RecordField> idField = resultSchema.getField("id"); + assertTrue(idField.isPresent()); + assertEquals(RecordFieldType.INT, idField.get().getDataType().getFieldType()); + + validFlowFile.assertAttributeEquals("record.count", "2"); + + Record record = recordReader.nextRecord(); + assertEquals(1, record.getValue("id")); + assertEquals("John", record.getValue("firstName")); + assertEquals("Doe", record.getValue("lastName")); + + record = recordReader.nextRecord(); + assertEquals(2, record.getValue("id")); + assertEquals("Jane", record.getValue("firstName")); + assertEquals("Doe", record.getValue("lastName")); + } + + final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0); + invalidFlowFile.assertAttributeEquals("record.count", "1"); + final String expectedInvalidContents = "invalid\n" + + "\"Three\",\"Jack\",\"Doe\"\n"; + invalidFlowFile.assertContentEquals(expectedInvalidContents); + } + + /** + * This test case demonstrates the limitation on JsonRecordSetWriter type-coercing when strict type check is disabled. + * Since WriteJsonResult.writeRawRecord doesn't use record schema, + * type coercing does not happen with JsonWriter even if strict type check is disabled. + * + * E.g. When an input "1" as string is given, and output field schema is int: + * <ul> + * <li>Expected result: "id": 1 (without quote)</li> + * <li>Actual result: "id": "1" (with quote)</li> + * </ul> + */ + @Test + public void testNonStrictTypeCheckWithJsonWriter() throws InitializationException, IOException { + final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8"); + + final CSVReader csvReader = new CSVReader(); + runner.addControllerService("reader", csvReader); + runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived"); + runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true"); + runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true"); + runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue()); + runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false"); + runner.enableControllerService(csvReader); + + final JsonRecordSetWriter validWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", validWriter); + runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(validWriter); + + final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true); + runner.addControllerService("invalid-writer", invalidWriter); + runner.enableControllerService(invalidWriter); + + runner.setProperty(ValidateRecord.RECORD_READER, "reader"); + runner.setProperty(ValidateRecord.RECORD_WRITER, "writer"); + runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema); + runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer"); + runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false"); + runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false"); + + // The validationSchema expects 'id' to be int, but CSVReader reads it as 'string' + // with non-strict type check, the type difference should be accepted, and results should be written as 'int'. + final String content = "id, firstName, lastName\n" + + "1, John, Doe\n" + + "2, Jane, Doe\n" + + "Three, Jack, Doe\n"; + + runner.enqueue(content); + runner.run(); + + runner.assertTransferCount(ValidateRecord.REL_VALID, 1); + runner.assertTransferCount(ValidateRecord.REL_INVALID, 1); + runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0); + + /* + TODO: JsonRecordSetWriter does not coerce value. Should we fix this?? + */ + final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0); + validFlowFile.assertAttributeEquals("record.count", "2"); + final String expectedValidContents = "[" + + "{\"id\":\"1\",\"firstName\":\"John\",\"lastName\":\"Doe\"}," + + "{\"id\":\"2\",\"firstName\":\"Jane\",\"lastName\":\"Doe\"}" + + "]"; + validFlowFile.assertContentEquals(expectedValidContents); + + final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0); + invalidFlowFile.assertAttributeEquals("record.count", "1"); + final String expectedInvalidContents = "invalid\n" + + "\"Three\",\"Jack\",\"Doe\"\n"; + invalidFlowFile.assertContentEquals(expectedInvalidContents); + } + }
