Repository: nifi
Updated Branches:
  refs/heads/master c51512f5e -> 0efddf47d


NIFI-4579: Fix ValidateRecord type coercing

Signed-off-by: Matthew Burgess <[email protected]>

This closes #2794


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0efddf47
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0efddf47
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0efddf47

Branch: refs/heads/master
Commit: 0efddf47d516b62d7d9c61142d20ce40bcec675f
Parents: c51512f
Author: Koji Kawamura <[email protected]>
Authored: Thu Jun 14 15:39:17 2018 +0900
Committer: Matthew Burgess <[email protected]>
Committed: Thu Dec 13 16:10:41 2018 -0500

----------------------------------------------------------------------
 .../processors/standard/ValidateRecord.java     |  18 +-
 .../processors/standard/TestValidateRecord.java | 223 +++++++++++++++++++
 2 files changed, 234 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0efddf47/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
index 52f462a..bc39ecf 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
@@ -109,7 +109,9 @@ public class ValidateRecord extends AbstractProcessor {
     static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
         .name("record-writer")
         .displayName("Record Writer")
-        .description("Specifies the Controller Service to use for writing out 
the records")
+        .description("Specifies the Controller Service to use for writing out 
the records. "
+            + "Regardless of the Controller Service schema access 
configuration, "
+            + "the schema that is used to validate record is used to write the 
valid results.")
         .identifiesControllerService(RecordSetWriterFactory.class)
         .required(true)
         .build();
@@ -117,7 +119,8 @@ public class ValidateRecord extends AbstractProcessor {
         .name("invalid-record-writer")
         .displayName("Record Writer for Invalid Records")
         .description("If specified, this Controller Service will be used to 
write out any records that are invalid. "
-            + "If not specified, the writer specified by the \"Record Writer\" 
property will be used. This is useful, for example, when the configured "
+            + "If not specified, the writer specified by the \"Record Writer\" 
property will be used with the schema used to read the input records. "
+            + "This is useful, for example, when the configured "
             + "Record Writer cannot write data that does not adhere to its 
schema (as is the case with Avro) or when it is desirable to keep invalid 
records "
             + "in their original format while converting valid records to 
another format.")
         .identifiesControllerService(RecordSetWriterFactory.class)
@@ -161,7 +164,7 @@ public class ValidateRecord extends AbstractProcessor {
         .displayName("Allow Extra Fields")
         .description("If the incoming data has fields that are not present in 
the schema, this property determines whether or not the Record is valid. "
             + "If true, the Record is still valid. If false, the Record will 
be invalid due to the extra fields.")
-        .expressionLanguageSupported(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
         .allowableValues("true", "false")
         .defaultValue("true")
         .required(true)
@@ -172,7 +175,7 @@ public class ValidateRecord extends AbstractProcessor {
         .description("If the incoming data has a Record where a field is not 
of the correct type, this property determine whether how to handle the Record. "
             + "If true, the Record will still be considered invalid. If false, 
the Record will be considered valid and the field will be coerced into the "
             + "correct type (if possible, according to the type coercion 
supported by the Record Writer).")
-        .expressionLanguageSupported(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
         .allowableValues("true", "false")
         .defaultValue("true")
         .required(true)
@@ -292,7 +295,8 @@ public class ValidateRecord extends AbstractProcessor {
                             validFlowFile = session.create(flowFile);
                         }
 
-                        validWriter = writer = createIfNecessary(validWriter, 
validRecordWriterFactory, session, validFlowFile, record.getSchema());
+                        validWriter = writer = createIfNecessary(validWriter, 
validRecordWriterFactory, session, validFlowFile, validationSchema);
+
                     } else {
                         invalidCount++;
                         logValidationErrors(flowFile, recordCount, result);
@@ -435,13 +439,13 @@ public class ValidateRecord extends AbstractProcessor {
     }
 
     private RecordSetWriter createIfNecessary(final RecordSetWriter writer, 
final RecordSetWriterFactory factory, final ProcessSession session,
-        final FlowFile flowFile, final RecordSchema inputSchema) throws 
SchemaNotFoundException, IOException {
+        final FlowFile flowFile, final RecordSchema outputSchema) throws 
SchemaNotFoundException, IOException {
         if (writer != null) {
             return writer;
         }
 
         final OutputStream out = session.write(flowFile);
-        final RecordSetWriter created = factory.createWriter(getLogger(), 
inputSchema, out);
+        final RecordSetWriter created = factory.createWriter(getLogger(), 
outputSchema, out);
         created.beginRecordSet();
         return created;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0efddf47/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
index 728875b..fdb5be4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
@@ -17,23 +17,38 @@
 
 package org.apache.nifi.processors.standard;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Optional;
 
+import org.apache.nifi.avro.AvroReader;
+import org.apache.nifi.avro.AvroRecordSetWriter;
 import org.apache.nifi.csv.CSVReader;
 import org.apache.nifi.csv.CSVRecordSetWriter;
 import org.apache.nifi.csv.CSVUtils;
+import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestValidateRecord {
 
     private TestRunner runner;
@@ -137,4 +152,212 @@ public class TestValidateRecord {
         invalidFlowFile.assertAttributeEquals("record.count", "1");
         invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack 
Doe\"\n");
     }
+
+    @Test
+    public void testStrictTypeCheck() throws InitializationException, 
IOException {
+        final String validateSchema = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")),
 "UTF-8");
+
+        final CSVReader csvReader = new CSVReader();
+        runner.addControllerService("reader", csvReader);
+        runner.setProperty(csvReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
+        runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
+        runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
+        runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, 
CSVUtils.QUOTE_MINIMAL.getValue());
+        runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
+        runner.enableControllerService(csvReader);
+
+        final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", validWriter);
+        runner.setProperty(validWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(validWriter);
+
+        final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", 
true);
+        runner.addControllerService("invalid-writer", invalidWriter);
+        runner.enableControllerService(invalidWriter);
+
+        runner.setProperty(ValidateRecord.RECORD_READER, "reader");
+        runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
+        runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, 
"invalid-writer");
+        runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
+        runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
+
+        // The validationSchema expects 'id' to be int, but CSVReader reads it 
as 'string'
+        // with strict type check, the type difference is not allowed.
+        final String content = "id, firstName, lastName\n"
+                + "1, John, Doe\n"
+                + "2, Jane, Doe\n"
+                + "Three, Jack, Doe\n";
+
+        runner.enqueue(content);
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_VALID, 0);
+        runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
+        runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
+
+        final MockFlowFile invalidFlowFile = 
runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
+        invalidFlowFile.assertAttributeEquals("record.count", "3");
+        final String expectedInvalidContents = "invalid\n"
+                + "\"1\",\"John\",\"Doe\"\n"
+                + "\"2\",\"Jane\",\"Doe\"\n"
+                + "\"Three\",\"Jack\",\"Doe\"\n";
+        invalidFlowFile.assertContentEquals(expectedInvalidContents);
+    }
+
+    @Test
+    public void testNonStrictTypeCheckWithAvroWriter() throws 
InitializationException, IOException, MalformedRecordException, 
SchemaNotFoundException {
+        final String validateSchema = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")),
 "UTF-8");
+
+        final CSVReader csvReader = new CSVReader();
+        runner.addControllerService("reader", csvReader);
+        runner.setProperty(csvReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
+        runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
+        runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
+        runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, 
CSVUtils.QUOTE_MINIMAL.getValue());
+        runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
+        runner.enableControllerService(csvReader);
+
+        final AvroRecordSetWriter validWriter = new AvroRecordSetWriter();
+        runner.addControllerService("writer", validWriter);
+        runner.setProperty(validWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(validWriter);
+
+        final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", 
true);
+        runner.addControllerService("invalid-writer", invalidWriter);
+        runner.enableControllerService(invalidWriter);
+
+        runner.setProperty(ValidateRecord.RECORD_READER, "reader");
+        runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
+        runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, 
"invalid-writer");
+        runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
+        runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
+
+        // The validationSchema expects 'id' to be int, but CSVReader reads it 
as 'string'
+        // with non-strict type check, the type difference should be accepted, 
and results should be written as 'int'.
+        final String content = "id, firstName, lastName\n"
+                + "1, John, Doe\n"
+                + "2, Jane, Doe\n"
+                + "Three, Jack, Doe\n";
+
+        runner.enqueue(content);
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+        runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
+        runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
+
+        final AvroReader avroReader = new AvroReader();
+        runner.addControllerService("avroReader", avroReader);
+        runner.setProperty(avroReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.enableControllerService(avroReader);
+        final MockFlowFile validFlowFile = 
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
+        try (
+        final ByteArrayInputStream resultContentStream = new 
ByteArrayInputStream(validFlowFile.toByteArray());
+        final RecordReader recordReader = 
avroReader.createRecordReader(validFlowFile.getAttributes(), 
resultContentStream, runner.getLogger());
+        ) {
+            final RecordSchema resultSchema = recordReader.getSchema();
+            assertEquals(3, resultSchema.getFieldCount());
+
+            // The id field should be an int field.
+            final Optional<RecordField> idField = resultSchema.getField("id");
+            assertTrue(idField.isPresent());
+            assertEquals(RecordFieldType.INT, 
idField.get().getDataType().getFieldType());
+
+            validFlowFile.assertAttributeEquals("record.count", "2");
+
+            Record record = recordReader.nextRecord();
+            assertEquals(1, record.getValue("id"));
+            assertEquals("John", record.getValue("firstName"));
+            assertEquals("Doe", record.getValue("lastName"));
+
+            record = recordReader.nextRecord();
+            assertEquals(2, record.getValue("id"));
+            assertEquals("Jane", record.getValue("firstName"));
+            assertEquals("Doe", record.getValue("lastName"));
+        }
+
+        final MockFlowFile invalidFlowFile = 
runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
+        invalidFlowFile.assertAttributeEquals("record.count", "1");
+        final String expectedInvalidContents = "invalid\n"
+                + "\"Three\",\"Jack\",\"Doe\"\n";
+        invalidFlowFile.assertContentEquals(expectedInvalidContents);
+    }
+
+    /**
+     * This test case demonstrates the limitation on JsonRecordSetWriter 
type-coercing when strict type check is disabled.
+     * Since WriteJsonResult.writeRawRecord doesn't use record schema,
+     * type coercing does not happen with JsonWriter even if strict type check 
is disabled.
+     *
+     * E.g. When an input "1" as string is given, and output field schema is 
int:
+     * <ul>
+     * <li>Expected result: "id": 1 (without quote)</li>
+     * <li>Actual result: "id": "1" (with quote)</li>
+     * </ul>
+     */
+    @Test
+    public void testNonStrictTypeCheckWithJsonWriter() throws 
InitializationException, IOException {
+        final String validateSchema = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")),
 "UTF-8");
+
+        final CSVReader csvReader = new CSVReader();
+        runner.addControllerService("reader", csvReader);
+        runner.setProperty(csvReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
+        runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
+        runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
+        runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, 
CSVUtils.QUOTE_MINIMAL.getValue());
+        runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
+        runner.enableControllerService(csvReader);
+
+        final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", validWriter);
+        runner.setProperty(validWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(validWriter);
+
+        final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", 
true);
+        runner.addControllerService("invalid-writer", invalidWriter);
+        runner.enableControllerService(invalidWriter);
+
+        runner.setProperty(ValidateRecord.RECORD_READER, "reader");
+        runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
+        runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, 
"invalid-writer");
+        runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
+        runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
+
+        // The validationSchema expects 'id' to be int, but CSVReader reads it 
as 'string'
+        // with non-strict type check, the type difference should be accepted, 
and results should be written as 'int'.
+        final String content = "id, firstName, lastName\n"
+                + "1, John, Doe\n"
+                + "2, Jane, Doe\n"
+                + "Three, Jack, Doe\n";
+
+        runner.enqueue(content);
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+        runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
+        runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
+
+        /*
+        TODO: JsonRecordSetWriter does not coerce value. Should we fix this??
+         */
+        final MockFlowFile validFlowFile = 
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
+        validFlowFile.assertAttributeEquals("record.count", "2");
+        final String expectedValidContents = "[" +
+                "{\"id\":\"1\",\"firstName\":\"John\",\"lastName\":\"Doe\"}," +
+                "{\"id\":\"2\",\"firstName\":\"Jane\",\"lastName\":\"Doe\"}" +
+                "]";
+        validFlowFile.assertContentEquals(expectedValidContents);
+
+        final MockFlowFile invalidFlowFile = 
runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
+        invalidFlowFile.assertAttributeEquals("record.count", "1");
+        final String expectedInvalidContents = "invalid\n"
+                + "\"Three\",\"Jack\",\"Doe\"\n";
+        invalidFlowFile.assertContentEquals(expectedInvalidContents);
+    }
+
 }

Reply via email to