http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
index b47bece..9a9fed7 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
@@ -33,6 +33,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -74,7 +75,7 @@ public class MockRecordParser extends 
AbstractControllerService implements Recor
             }
 
             @Override
-            public Record nextRecord() throws IOException, 
MalformedRecordException {
+            public Record nextRecord(boolean coerceTypes, boolean dropUnknown) 
throws IOException, MalformedRecordException, SchemaValidationException {
                 if (failAfterN >= recordCount) {
                     throw new MalformedRecordException("Intentional Unit Test 
Exception because " + recordCount + " records have been read");
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
index 2be37df..5e6a9ca 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
@@ -44,7 +44,7 @@ class GroovyRecordReader implements RecordReader {
             new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 
300])
     ].iterator()
 
-    Record nextRecord() throws IOException, MalformedRecordException {
+    Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws 
IOException, MalformedRecordException {
         return recordIterator.hasNext() ? recordIterator.next() : null
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
index d51089b..b94c380 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
@@ -51,7 +51,7 @@ class GroovyXmlRecordReader implements RecordReader {
         }.iterator()
     }
 
-    Record nextRecord() throws IOException, MalformedRecordException {
+    Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws 
IOException, MalformedRecordException {
         return recordIterator?.hasNext() ? recordIterator.next() : null
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 836e83d..19f47be 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -1,15 +1,16 @@
 <?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
-    license agreements. See the NOTICE file distributed with this work for 
additional
-    information regarding copyright ownership. The ASF licenses this file to
-    You under the Apache License, Version 2.0 (the "License"); you may not use
-    this file except in compliance with the License. You may obtain a copy of
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-    by applicable law or agreed to in writing, software distributed under the
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
-    OF ANY KIND, either express or implied. See the License for the specific
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+    license agreements. See the NOTICE file distributed with this work for 
additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    You under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
     language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
@@ -356,6 +357,11 @@
             <artifactId>calcite-core</artifactId>
             <version>1.12.0</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -498,7 +504,8 @@
                         
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude>
                         
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude>
                         
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude>
-                        <!-- This file is copied from 
https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 
and we must support Java 7 -->
+                        <!-- This file is copied from 
https://github.com/jeremyh/jBCrypt 
+                            because the binary is compiled for Java 8 and we 
must support Java 7 -->
                         
<exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude>
                     </excludes>
                 </configuration>

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
new file mode 100644
index 0000000..8e8fca8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.validation.SchemaValidationContext;
+import org.apache.nifi.schema.validation.StandardSchemaValidator;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RawRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.validation.RecordSchemaValidator;
+import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
+import org.apache.nifi.serialization.record.validation.ValidationError;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"record", "schema", "validate"})
+@CapabilityDescription("Validates the Records of an incoming FlowFile against 
a given schema. All records that adhere to the schema are routed to the 
\"valid\" relationship while "
+    + "records that do not adhere to hte schema are routed to the \"invalid\" 
relationship. It is therefore possible for a single incoming FlowFile to be 
split into two individual "
+    + "FlowFiles if some records are valid according to the schema and others 
are not. Any FlowFile that is routed to the \"invalid\" relationship will emit 
a ROUTE Provenance Event "
+    + "with the Details field populated to explain why records were invalid. 
In addition, to gain further explanation of why records were invalid, 
DEBUG-level logging can be enabled "
+    + "for the \"org.apache.nifi.processors.standard.ValidateRecord\" logger.")
+public class ValidateRecord extends AbstractProcessor {
+
+    static final AllowableValue SCHEMA_NAME_PROPERTY = new 
AllowableValue("schema-name-property", "Use Schema Name Property",
+        "The schema to validate the data against is determined by looking at 
the 'Schema Name' Property and looking up the schema in the configured Schema 
Registry");
+    static final AllowableValue SCHEMA_TEXT_PROPERTY = new 
AllowableValue("schema-text-property", "Use Schema Text Property",
+        "The schema to validate the data against is determined by looking at 
the 'Schema Text' Property and parsing the schema as an Avro schema");
+    static final AllowableValue READER_SCHEMA = new 
AllowableValue("reader-schema", "Use Reader's Schema",
+        "The schema to validate the data against is determined by asking the 
configured Record Reader for its schema");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for reading 
incoming data")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing out 
the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("schema-access-strategy")
+        .displayName("Schema Access Strategy")
+        .description("Specifies how to obtain the schema that should be used 
to validate records")
+        .allowableValues(READER_SCHEMA, SCHEMA_NAME_PROPERTY, 
SCHEMA_TEXT_PROPERTY)
+        .defaultValue(READER_SCHEMA.getValue())
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_REGISTRY = new 
PropertyDescriptor.Builder()
+        .name("schema-registry")
+        .displayName("Schema Registry")
+        .description("Specifies the Controller Service to use for the Schema 
Registry. This is necessary only if the Schema Access Strategy is set to \"Use 
'Schema Name' Property\".")
+        .identifiesControllerService(SchemaRegistry.class)
+        .required(false)
+        .build();
+    static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("Specifies the name of the schema to lookup in the Schema 
Registry property")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("${schema.name}")
+        .required(false)
+        .build();
+    static final PropertyDescriptor SCHEMA_TEXT = new 
PropertyDescriptor.Builder()
+        .name("schema-text")
+        .displayName("Schema Text")
+        .description("The text of an Avro-formatted Schema")
+        .addValidator(new AvroSchemaValidator())
+        .expressionLanguageSupported(true)
+        .defaultValue("${avro.schema}")
+        .required(false)
+        .build();
+    static final PropertyDescriptor ALLOW_EXTRA_FIELDS = new 
PropertyDescriptor.Builder()
+        .name("allow-extra-fields")
+        .displayName("Allow Extra Fields")
+        .description("If the incoming data has fields that are not present in 
the schema, this property determines whether or not the Record is valid. "
+            + "If true, the Record is still valid. If false, the Record will 
be invalid due to the extra fields.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+    static final PropertyDescriptor STRICT_TYPE_CHECKING = new 
PropertyDescriptor.Builder()
+        .name("strict-type-checking")
+        .displayName("Strict Type Checking")
+        .description("If the incoming data has a Record where a field is not 
of the correct type, this property determine whether how to handle the Record. "
+            + "If true, the Record will still be considered invalid. If false, 
the Record will be considered valid and the field will be coerced into the "
+            + "correct type (if possible, according to the type coercion 
supported by the Record Writer).")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+
+    static final Relationship REL_VALID = new Relationship.Builder()
+        .name("valid")
+        .description("Records that are valid according to the schema will be 
routed to this relationship")
+        .build();
+    static final Relationship REL_INVALID = new Relationship.Builder()
+        .name("invalid")
+        .description("Records that are not valid according to the schema will 
be routed to this relationship")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If the records cannot be read, validated, or written, 
for any reason, the original FlowFile will be routed to this relationship")
+        .build();
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        properties.add(SCHEMA_ACCESS_STRATEGY);
+        properties.add(SCHEMA_REGISTRY);
+        properties.add(SCHEMA_NAME);
+        properties.add(SCHEMA_TEXT);
+        properties.add(ALLOW_EXTRA_FIELDS);
+        properties.add(STRICT_TYPE_CHECKING);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_VALID);
+        relationships.add(REL_INVALID);
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final String schemaAccessStrategy = 
validationContext.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+        if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
+            if (!validationContext.getProperty(SCHEMA_REGISTRY).isSet()) {
+                return Collections.singleton(new ValidationResult.Builder()
+                    .subject("Schema Registry")
+                    .valid(false)
+                    .explanation("If the Schema Access Strategy is set to 
\"Use 'Schema Name' Property\", the Schema Registry property must also be set")
+                    .build());
+            }
+
+            final SchemaRegistry registry = 
validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+            if 
(!registry.getSuppliedSchemaFields().contains(SchemaField.SCHEMA_NAME)) {
+                return Collections.singleton(new ValidationResult.Builder()
+                    .subject("Schema Registry")
+                    .valid(false)
+                    .explanation("The configured Schema Registry does not 
support accessing schemas by name")
+                    .build());
+            }
+        }
+
+        return Collections.emptyList();
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+        final boolean allowExtraFields = 
context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean();
+        final boolean strictTypeChecking = 
context.getProperty(STRICT_TYPE_CHECKING).asBoolean();
+
+        RecordSetWriter validWriter = null;
+        RecordSetWriter invalidWriter = null;
+        FlowFile validFlowFile = null;
+        FlowFile invalidFlowFile = null;
+
+        try (final InputStream in = session.read(flowFile);
+            final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            final RecordSchema validationSchema = getValidationSchema(context, 
flowFile, reader);
+            final SchemaValidationContext validationContext = new 
SchemaValidationContext(validationSchema, allowExtraFields, strictTypeChecking);
+            final RecordSchemaValidator validator = new 
StandardSchemaValidator(validationContext);
+
+            int recordCount = 0;
+            int validCount = 0;
+            int invalidCount = 0;
+
+            final Set<String> extraFields = new HashSet<>();
+            final Set<String> missingFields = new HashSet<>();
+            final Set<String> invalidFields = new HashSet<>();
+            final Set<String> otherProblems = new HashSet<>();
+
+            try {
+                Record record;
+                while ((record = reader.nextRecord(false, false)) != null) {
+                    final SchemaValidationResult result = 
validator.validate(record);
+                    recordCount++;
+
+                    RecordSetWriter writer;
+                    if (result.isValid()) {
+                        validCount++;
+                        if (validFlowFile == null) {
+                            validFlowFile = session.create(flowFile);
+                        }
+
+                        validWriter = writer = createIfNecessary(validWriter, 
writerFactory, session, validFlowFile, record.getSchema());
+                    } else {
+                        invalidCount++;
+                        logValidationErrors(flowFile, recordCount, result);
+
+                        if (invalidFlowFile == null) {
+                            invalidFlowFile = session.create(flowFile);
+                        }
+
+                        invalidWriter = writer = 
createIfNecessary(invalidWriter, writerFactory, session, invalidFlowFile, 
record.getSchema());
+
+                        // Add all of the validation errors to our 
Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
+                        // we keep too many then we both use up a lot of heap 
and risk outputting so much information in the Provenance Event
+                        // that it is too noisy to be useful.
+                        for (final ValidationError validationError : 
result.getValidationErrors()) {
+                            final Optional<String> fieldName = 
validationError.getFieldName();
+
+                            switch (validationError.getType()) {
+                                case EXTRA_FIELD:
+                                    if (fieldName.isPresent()) {
+                                        extraFields.add(fieldName.get());
+                                    } else {
+                                        
otherProblems.add(validationError.getExplanation());
+                                    }
+                                    break;
+                                case MISSING_FIELD:
+                                    if (fieldName.isPresent()) {
+                                        missingFields.add(fieldName.get());
+                                    } else {
+                                        
otherProblems.add(validationError.getExplanation());
+                                    }
+                                    break;
+                                case INVALID_FIELD:
+                                    if (fieldName.isPresent()) {
+                                        invalidFields.add(fieldName.get());
+                                    } else {
+                                        
otherProblems.add(validationError.getExplanation());
+                                    }
+                                    break;
+                                case OTHER:
+                                    
otherProblems.add(validationError.getExplanation());
+                                    break;
+                            }
+                        }
+                    }
+
+                    if (writer instanceof RawRecordWriter) {
+                        ((RawRecordWriter) writer).writeRawRecord(record);
+                    } else {
+                        writer.write(record);
+                    }
+                }
+
+                if (validWriter != null) {
+                    completeFlowFile(session, validFlowFile, validWriter, 
REL_VALID, null);
+                }
+
+                if (invalidWriter != null) {
+                    // Build up a String that explains why the records were 
invalid, so that we can add this to the Provenance Event.
+                    final StringBuilder errorBuilder = new StringBuilder();
+                    errorBuilder.append("Records in this FlowFile were invalid 
for the following reasons: ");
+                    if (!missingFields.isEmpty()) {
+                        errorBuilder.append("The following 
").append(missingFields.size()).append(" fields were missing: 
").append(missingFields.toString());
+                    }
+
+                    if (!extraFields.isEmpty()) {
+                        if (errorBuilder.length() > 0) {
+                            errorBuilder.append("; ");
+                        }
+
+                        errorBuilder.append("The following 
").append(extraFields.size())
+                            .append(" fields were present in the Record but 
not in the schema: ").append(extraFields.toString());
+                    }
+
+                    if (!invalidFields.isEmpty()) {
+                        if (errorBuilder.length() > 0) {
+                            errorBuilder.append("; ");
+                        }
+
+                        errorBuilder.append("The following 
").append(invalidFields.size())
+                            .append(" fields had values whose type did not 
match the schema: ").append(invalidFields.toString());
+                    }
+
+                    if (!otherProblems.isEmpty()) {
+                        if (errorBuilder.length() > 0) {
+                            errorBuilder.append("; ");
+                        }
+
+                        errorBuilder.append("The following 
").append(otherProblems.size())
+                            .append(" additional problems were encountered: 
").append(otherProblems.toString());
+                    }
+
+                    final String validationErrorString = 
errorBuilder.toString();
+                    completeFlowFile(session, invalidFlowFile, invalidWriter, 
REL_INVALID, validationErrorString);
+                }
+            } finally {
+                closeQuietly(validWriter);
+                closeQuietly(invalidWriter);
+            }
+
+            session.adjustCounter("Records Validated", recordCount, false);
+            session.adjustCounter("Records Found Valid", validCount, false);
+            session.adjustCounter("Records Found Invalid", invalidCount, 
false);
+        } catch (final IOException | MalformedRecordException | 
SchemaNotFoundException e) {
+            getLogger().error("Failed to process {}; will route to failure", 
new Object[] {flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+            if (validFlowFile != null) {
+                session.remove(validFlowFile);
+            }
+            if (invalidFlowFile != null) {
+                session.remove(invalidFlowFile);
+            }
+            return;
+        }
+
+        session.remove(flowFile);
+    }
+
+    private void closeQuietly(final RecordSetWriter writer) {
+        if (writer != null) {
+            try {
+                writer.close();
+            } catch (final Exception e) {
+                getLogger().error("Failed to close Record Writer", e);
+            }
+        }
+    }
+
+    private void completeFlowFile(final ProcessSession session, final FlowFile 
flowFile, final RecordSetWriter writer, final Relationship relationship, final 
String details) throws IOException {
+        final WriteResult writeResult = writer.finishRecordSet();
+        writer.close();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.putAll(writeResult.getAttributes());
+        attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+        session.putAllAttributes(flowFile, attributes);
+
+        session.transfer(flowFile, relationship);
+        session.getProvenanceReporter().route(flowFile, relationship, details);
+    }
+
+    private RecordSetWriter createIfNecessary(final RecordSetWriter writer, 
final RecordSetWriterFactory factory, final ProcessSession session,
+        final FlowFile flowFile, final RecordSchema inputSchema) throws 
SchemaNotFoundException, IOException {
+        if (writer != null) {
+            return writer;
+        }
+
+        final OutputStream out = session.write(flowFile);
+        final RecordSetWriter created = factory.createWriter(getLogger(), 
inputSchema, flowFile, out);
+        created.beginRecordSet();
+        return created;
+    }
+
+    private void logValidationErrors(final FlowFile flowFile, final int 
recordCount, final SchemaValidationResult result) {
+        if (getLogger().isDebugEnabled()) {
+            final StringBuilder sb = new StringBuilder();
+            sb.append("For ").append(flowFile).append(" Record 
#").append(recordCount).append(" is invalid due to:\n");
+            for (final ValidationError error : result.getValidationErrors()) {
+                sb.append(error).append("\n");
+            }
+
+            getLogger().debug(sb.toString());
+        }
+    }
+
+    protected RecordSchema getValidationSchema(final ProcessContext context, 
final FlowFile flowFile, final RecordReader reader)
+        throws MalformedRecordException, IOException, SchemaNotFoundException {
+        final String schemaAccessStrategy = 
context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+        if (schemaAccessStrategy.equals(READER_SCHEMA.getValue())) {
+            return reader.getSchema();
+        } else if 
(schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
+            final SchemaRegistry schemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+            final String schemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            return schemaRegistry.retrieveSchema(schemaName);
+        } else if 
(schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
+            final String schemaText = 
context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();
+            final Parser parser = new Schema.Parser();
+            final Schema avroSchema = parser.parse(schemaText);
+            return AvroTypeUtil.createSchema(avroSchema);
+        } else {
+            throw new ProcessException("Invalid Schema Access Strategy: " + 
schemaAccessStrategy);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b8eb4a1..f3e72e0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -102,6 +102,7 @@ org.apache.nifi.processors.standard.TransformXml
 org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.ValidateXml
 org.apache.nifi.processors.standard.ValidateCsv
+org.apache.nifi.processors.standard.ValidateRecord
 org.apache.nifi.processors.standard.Wait
 org.apache.nifi.processors.standard.ExecuteSQL
 org.apache.nifi.processors.standard.FetchDistributedMapCache

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
index 13a8317..c1d87b6 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -32,7 +32,7 @@ public abstract class AvroRecordReader implements 
RecordReader {
     protected abstract GenericRecord nextAvroRecord() throws IOException;
 
     @Override
-    public Record nextRecord() throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
         GenericRecord record = nextAvroRecord();
         if (record == null) {
             return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index 1528052..135dd80 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -34,6 +34,7 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.DateTimeUtils;
@@ -57,6 +58,8 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile boolean firstLineIsHeader;
+    private volatile boolean ignoreHeader;
 
 
     @Override
@@ -67,7 +70,8 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
         properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
         properties.add(CSVUtils.CSV_FORMAT);
         properties.add(CSVUtils.VALUE_SEPARATOR);
-        properties.add(CSVUtils.SKIP_HEADER_LINE);
+        properties.add(CSVUtils.FIRST_LINE_IS_HEADER);
+        properties.add(CSVUtils.IGNORE_CSV_HEADER);
         properties.add(CSVUtils.QUOTE_CHAR);
         properties.add(CSVUtils.ESCAPE_CHAR);
         properties.add(CSVUtils.COMMENT_MARKER);
@@ -82,6 +86,16 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
         this.dateFormat = 
context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
         this.timeFormat = 
context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
         this.timestampFormat = 
context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+        this.firstLineIsHeader = 
context.getProperty(CSVUtils.FIRST_LINE_IS_HEADER).asBoolean();
+        this.ignoreHeader = 
context.getProperty(CSVUtils.IGNORE_CSV_HEADER).asBoolean();
+
+        // Ensure that if we are deriving schema from header that we always 
treat the first line as a header,
+        // regardless of the 'First Line is Header' property
+        final String accessStrategy = 
context.getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
+        if (headerDerivedAllowableValue.getValue().equals(accessStrategy)) {
+            this.csvFormat = this.csvFormat.withFirstRecordAsHeader();
+            this.firstLineIsHeader = true;
+        }
     }
 
     @Override
@@ -92,7 +106,7 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
         final RecordSchema schema = getSchema(flowFile, new 
NonCloseableInputStream(bufferedIn), null);
         bufferedIn.reset();
 
-        return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, 
dateFormat, timeFormat, timestampFormat);
+        return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, 
firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
index f193fba..18bea6b 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
@@ -22,8 +22,13 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.text.DateFormat;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 
 import org.apache.commons.csv.CSVFormat;
@@ -36,7 +41,6 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
@@ -49,7 +53,9 @@ public class CSVRecordReader implements RecordReader {
     private final Supplier<DateFormat> LAZY_TIME_FORMAT;
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
 
-    public CSVRecordReader(final InputStream in, final ComponentLog logger, 
final RecordSchema schema, final CSVFormat csvFormat,
+    private List<String> rawFieldNames;
+
+    public CSVRecordReader(final InputStream in, final ComponentLog logger, 
final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, 
final boolean ignoreHeader,
         final String dateFormat, final String timeFormat, final String 
timestampFormat) throws IOException {
 
         this.schema = schema;
@@ -62,48 +68,78 @@ public class CSVRecordReader implements RecordReader {
         LAZY_TIMESTAMP_FORMAT = () -> tsf;
 
         final Reader reader = new InputStreamReader(new BOMInputStream(in));
-        final CSVFormat withHeader = 
csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));
+
+        CSVFormat withHeader;
+        if (hasHeader) {
+            withHeader = csvFormat.withSkipHeaderRecord();
+
+            if (ignoreHeader) {
+                withHeader = 
withHeader.withHeader(schema.getFieldNames().toArray(new String[0]));
+            }
+        } else {
+            withHeader = 
csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));
+        }
+
         csvParser = new CSVParser(reader, withHeader);
     }
 
     @Override
-    public Record nextRecord() throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
         final RecordSchema schema = getSchema();
 
+        final List<String> rawFieldNames = getRawFieldNames();
+        final int numFieldNames = rawFieldNames.size();
+
         for (final CSVRecord csvRecord : csvParser) {
-            final Map<String, Object> rowValues = new 
HashMap<>(schema.getFieldCount());
+            final Map<String, Object> values = new LinkedHashMap<>();
+            for (int i = 0; i < csvRecord.size(); i++) {
+                final String rawFieldName = numFieldNames <= i ? 
"unknown_field_index_" + i : rawFieldNames.get(i);
+                final String rawValue = csvRecord.get(i);
 
-            for (final RecordField recordField : schema.getFields()) {
-                String rawValue = null;
-                final String fieldName = recordField.getFieldName();
-                if (csvRecord.isSet(fieldName)) {
-                    rawValue = csvRecord.get(fieldName);
-                } else {
-                    for (final String alias : recordField.getAliases()) {
-                        if (csvRecord.isSet(alias)) {
-                            rawValue = csvRecord.get(alias);
-                            break;
-                        }
-                    }
-                }
+                final Optional<DataType> dataTypeOption = 
schema.getDataType(rawFieldName);
 
-                if (rawValue == null) {
-                    rowValues.put(fieldName, null);
+                if (!dataTypeOption.isPresent() && dropUnknownFields) {
                     continue;
                 }
 
-                final Object converted = convert(rawValue, 
recordField.getDataType(), fieldName);
-                if (converted != null) {
-                    rowValues.put(fieldName, converted);
+                final Object value;
+                if (coerceTypes && dataTypeOption.isPresent()) {
+                    value = convert(rawValue, dataTypeOption.get(), 
rawFieldName);
+                } else if (dataTypeOption.isPresent()) {
+                    // The CSV Reader is going to return all fields as 
Strings, because CSV doesn't have any way to
+                    // dictate a field type. As a result, we will use the 
schema that we have to attempt to convert
+                    // the value into the desired type if it's a simple type.
+                    value = convertSimpleIfPossible(rawValue, 
dataTypeOption.get(), rawFieldName);
+                } else {
+                    value = rawValue;
                 }
+
+                values.put(rawFieldName, value);
             }
 
-            return new MapRecord(schema, rowValues);
+            return new MapRecord(schema, values, coerceTypes, 
dropUnknownFields);
         }
 
         return null;
     }
 
+
+    private List<String> getRawFieldNames() {
+        if (this.rawFieldNames != null) {
+            return this.rawFieldNames;
+        }
+
+        // Use a SortedMap keyed by index of the field so that we can get a 
List of field names in the correct order
+        final SortedMap<Integer, String> sortedMap = new TreeMap<>();
+        for (final Map.Entry<String, Integer> entry : 
csvParser.getHeaderMap().entrySet()) {
+            sortedMap.put(entry.getValue(), entry.getKey());
+        }
+
+        this.rawFieldNames = new ArrayList<>(sortedMap.values());
+        return this.rawFieldNames;
+    }
+
+
     @Override
     public RecordSchema getSchema() {
         return schema;
@@ -115,7 +151,6 @@ public class CSVRecordReader implements RecordReader {
         }
 
         final String trimmed = value.startsWith("\"") && value.endsWith("\"") 
? value.substring(1, value.length() - 1) : value;
-
         if (trimmed.isEmpty()) {
             return null;
         }
@@ -123,6 +158,40 @@ public class CSVRecordReader implements RecordReader {
         return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, 
LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
     }
 
+    private Object convertSimpleIfPossible(final String value, final DataType 
dataType, final String fieldName) {
+        if (dataType == null || value == null) {
+            return value;
+        }
+
+        final String trimmed = value.startsWith("\"") && value.endsWith("\"") 
? value.substring(1, value.length() - 1) : value;
+        if (trimmed.isEmpty()) {
+            return null;
+        }
+
+        switch (dataType.getFieldType()) {
+            case STRING:
+                return value;
+            case BOOLEAN:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case BYTE:
+            case CHAR:
+            case SHORT:
+            case TIME:
+            case TIMESTAMP:
+            case DATE:
+                if (DataTypeUtils.isCompatibleDataType(trimmed, dataType)) {
+                    return DataTypeUtils.convertType(trimmed, dataType, 
LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+                } else {
+                    return value;
+                }
+        }
+
+        return value;
+    }
+
     @Override
     public void close() throws IOException {
         csvParser.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
index 14cd60d..99f877f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
@@ -61,7 +61,7 @@ public class CSVUtils {
         .defaultValue("\"")
         .required(true)
         .build();
-    static final PropertyDescriptor SKIP_HEADER_LINE = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor FIRST_LINE_IS_HEADER = new 
PropertyDescriptor.Builder()
         .name("Skip Header Line")
         .displayName("Treat First Line as Header")
         .description("Specifies whether or not the first line of CSV should be 
considered a Header or should be considered a record. If the Schema Access 
Strategy "
@@ -74,6 +74,18 @@ public class CSVUtils {
         .defaultValue("false")
         .required(true)
         .build();
+    static final PropertyDescriptor IGNORE_CSV_HEADER = new 
PropertyDescriptor.Builder()
+        .name("ignore-csv-header")
+        .displayName("Ignore CSV Header Column Names")
+        .description("If the first line of a CSV is a header, and the 
configured schema does not match the fields named in the header line, this 
controls how "
+            + "the Reader will interpret the fields. If this property is true, 
then the field names mapped to each column are driven only by the configured 
schema and "
+            + "any fields not in the schema will be ignored. If this property 
is false, then the field names found in the CSV Header will be used as the 
names of the "
+            + "fields.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(false)
+        .build();
     static final PropertyDescriptor COMMENT_MARKER = new 
PropertyDescriptor.Builder()
         .name("Comment Marker")
         .description("The character that is used to denote the start of a 
comment. Any line that begins with this comment will be ignored.")
@@ -177,7 +189,7 @@ public class CSVUtils {
             .withAllowMissingColumnNames()
             .withIgnoreEmptyLines();
 
-        final PropertyValue skipHeaderPropertyValue = 
context.getProperty(SKIP_HEADER_LINE);
+        final PropertyValue skipHeaderPropertyValue = 
context.getProperty(FIRST_LINE_IS_HEADER);
         if (skipHeaderPropertyValue.getValue() != null && 
skipHeaderPropertyValue.asBoolean()) {
             format = format.withFirstRecordAsHeader();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 00270ed..82d687a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -20,19 +20,24 @@ package org.apache.nifi.csv;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVPrinter;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RawRecordWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-public class WriteCSVResult extends AbstractRecordSetWriter implements 
RecordSetWriter {
+public class WriteCSVResult extends AbstractRecordSetWriter implements 
RecordSetWriter, RawRecordWriter {
     private final RecordSchema recordSchema;
     private final SchemaAccessWriter schemaWriter;
     private final String dateFormat;
@@ -40,6 +45,9 @@ public class WriteCSVResult extends AbstractRecordSetWriter 
implements RecordSet
     private final String timestampFormat;
     private final CSVPrinter printer;
     private final Object[] fieldValues;
+    private final boolean includeHeaderLine;
+    private boolean headerWritten = false;
+    private String[] fieldNames;
 
     public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
         final String dateFormat, final String timeFormat, final String 
timestampFormat, final boolean includeHeaderLine) throws IOException {
@@ -50,9 +58,9 @@ public class WriteCSVResult extends AbstractRecordSetWriter 
implements RecordSet
         this.dateFormat = dateFormat;
         this.timeFormat = timeFormat;
         this.timestampFormat = timestampFormat;
+        this.includeHeaderLine = includeHeaderLine;
 
-        final String[] columnNames = recordSchema.getFieldNames().toArray(new 
String[0]);
-        final CSVFormat formatWithHeader = 
csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine);
+        final CSVFormat formatWithHeader = 
csvFormat.withSkipHeaderRecord(true);
         final OutputStreamWriter streamWriter = new OutputStreamWriter(out);
         printer = new CSVPrinter(streamWriter, formatWithHeader);
 
@@ -93,6 +101,34 @@ public class WriteCSVResult extends AbstractRecordSetWriter 
implements RecordSet
         printer.flush();
     }
 
+    private String[] getFieldNames(final Record record) {
+        if (fieldNames != null) {
+            return fieldNames;
+        }
+
+        final Set<String> allFields = new LinkedHashSet<>();
+        allFields.addAll(record.getRawFieldNames());
+        allFields.addAll(recordSchema.getFieldNames());
+        fieldNames = allFields.toArray(new String[0]);
+        return fieldNames;
+    }
+
+    private void includeHeaderIfNecessary(final Record record, final boolean 
includeOnlySchemaFields) throws IOException {
+        if (headerWritten || !includeHeaderLine) {
+            return;
+        }
+
+        final Object[] fieldNames;
+        if (includeOnlySchemaFields) {
+            fieldNames = recordSchema.getFieldNames().toArray(new Object[0]);
+        } else {
+            fieldNames = getFieldNames(record);
+        }
+
+        printer.printRecord(fieldNames);
+        headerWritten = true;
+    }
+
     @Override
     public Map<String, String> writeRecord(final Record record) throws 
IOException {
         // If we are not writing an active record set, then we need to ensure 
that we write the
@@ -101,6 +137,8 @@ public class WriteCSVResult extends AbstractRecordSetWriter 
implements RecordSet
             schemaWriter.writeHeader(recordSchema, getOutputStream());
         }
 
+        includeHeaderIfNecessary(record, true);
+
         int i = 0;
         for (final RecordField recordField : recordSchema.getFields()) {
             fieldValues[i++] = record.getAsString(recordField, 
getFormat(recordField));
@@ -111,6 +149,36 @@ public class WriteCSVResult extends 
AbstractRecordSetWriter implements RecordSet
     }
 
     @Override
+    public WriteResult writeRawRecord(final Record record) throws IOException {
+        // If we are not writing an active record set, then we need to ensure 
that we write the
+        // schema information.
+        if (!isActiveRecordSet()) {
+            schemaWriter.writeHeader(recordSchema, getOutputStream());
+        }
+
+        includeHeaderIfNecessary(record, false);
+
+        final String[] fieldNames = getFieldNames(record);
+        // Avoid creating a new Object[] for every Record if we can. But if 
the record has a different number of columns than does our
+        // schema, we don't have a lot of options here, so we just create a 
new Object[] in that case.
+        final Object[] recordFieldValues = (fieldNames.length == 
this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length];
+
+        int i = 0;
+        for (final String fieldName : fieldNames) {
+            final Optional<RecordField> recordField = 
recordSchema.getField(fieldName);
+            if (recordField.isPresent()) {
+                recordFieldValues[i++] = record.getAsString(fieldName, 
getFormat(recordField.get()));
+            } else {
+                recordFieldValues[i++] = record.getAsString(fieldName);
+            }
+        }
+
+        printer.printRecord(recordFieldValues);
+        final Map<String, String> attributes = 
schemaWriter.getAttributes(recordSchema);
+        return WriteResult.of(incrementRecordCount(), attributes);
+    }
+
+    @Override
     public String getMimeType() {
         return "text/csv";
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index fae3eba..54a2333 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -67,6 +67,7 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
     private volatile Grok grok;
     private volatile boolean appendUnmatchedLine;
     private volatile RecordSchema recordSchema;
+    private volatile RecordSchema recordSchemaFromGrok;
 
     private static final String DEFAULT_PATTERN_NAME = 
"/default-grok-patterns.txt";
 
@@ -133,9 +134,11 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
         appendUnmatchedLine = 
context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
 
+        this.recordSchemaFromGrok = createRecordSchema(grok);
+
         final String schemaAccess = 
context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
         if 
(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
-            this.recordSchema = createRecordSchema(grok);
+            this.recordSchema = recordSchemaFromGrok;
         } else {
             this.recordSchema = null;
         }
@@ -236,6 +239,6 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
     @Override
     public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws IOException, 
SchemaNotFoundException {
         final RecordSchema schema = getSchema(flowFile, in, null);
-        return new GrokRecordReader(in, grok, schema, appendUnmatchedLine);
+        return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, 
appendUnmatchedLine);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index e668b1c..65edf05 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -24,6 +24,7 @@ import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.regex.Pattern;
 
 import org.apache.nifi.serialization.MalformedRecordException;
@@ -43,6 +44,7 @@ public class GrokRecordReader implements RecordReader {
     private final BufferedReader reader;
     private final Grok grok;
     private final boolean append;
+    private final RecordSchema schemaFromGrok;
     private RecordSchema schema;
 
     private String nextLine;
@@ -55,11 +57,12 @@ public class GrokRecordReader implements RecordReader {
             + "(?:Suppressed\\: )|"
             + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
 
-    public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema, final boolean append) {
+    public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema, final RecordSchema schemaFromGrok, final boolean append) {
         this.reader = new BufferedReader(new InputStreamReader(in));
         this.grok = grok;
         this.schema = schema;
         this.append = append;
+        this.schemaFromGrok = schemaFromGrok;
     }
 
     @Override
@@ -68,7 +71,7 @@ public class GrokRecordReader implements RecordReader {
     }
 
     @Override
-    public Record nextRecord() throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
         Map<String, Object> valueMap = null;
         while (valueMap == null || valueMap.isEmpty()) {
             final String line = nextLine == null ? reader.readLine() : 
nextLine;
@@ -85,7 +88,7 @@ public class GrokRecordReader implements RecordReader {
         // Read the next line to see if it matches the pattern (in which case 
we will simply leave it for
         // the next call to nextRecord()) or we will attach it to the 
previously read record.
         String stackTrace = null;
-        final StringBuilder toAppend = new StringBuilder();
+        final StringBuilder trailingText = new StringBuilder();
         while ((nextLine = reader.readLine()) != null) {
             final Match nextLineMatch = grok.match(nextLine);
             nextLineMatch.captures();
@@ -97,7 +100,7 @@ public class GrokRecordReader implements RecordReader {
                     stackTrace = readStackTrace(nextLine);
                     break;
                 } else if (append) {
-                    toAppend.append("\n").append(nextLine);
+                    trailingText.append("\n").append(nextLine);
                 }
             } else {
                 // The next line matched our pattern.
@@ -105,49 +108,78 @@ public class GrokRecordReader implements RecordReader {
             }
         }
 
-        try {
-            final List<DataType> fieldTypes = schema.getDataTypes();
-            final Map<String, Object> values = new 
HashMap<>(fieldTypes.size());
+        final Record record = createRecord(valueMap, trailingText, stackTrace, 
coerceTypes, dropUnknownFields);
+        return record;
+    }
 
-            for (final RecordField field : schema.getFields()) {
-                Object value = valueMap.get(field.getFieldName());
-                if (value == null) {
-                    for (final String alias : field.getAliases()) {
-                        value = valueMap.get(alias);
-                        if (value != null) {
-                            break;
-                        }
-                    }
+    private Record createRecord(final Map<String, Object> valueMap, final 
StringBuilder trailingText, final String stackTrace, final boolean coerceTypes, 
final boolean dropUnknown) {
+        final Map<String, Object> converted = new HashMap<>();
+        for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
+            final String fieldName = entry.getKey();
+            final Object rawValue = entry.getValue();
+
+            final Object normalizedValue;
+            if (rawValue instanceof List) {
+                final List<?> list = (List<?>) rawValue;
+                final String[] array = new String[list.size()];
+                for (int i = 0; i < list.size(); i++) {
+                    final Object rawObject = list.get(i);
+                    array[i] = rawObject == null ? null : rawObject.toString();
                 }
+                normalizedValue = array;
+            } else {
+                normalizedValue = rawValue == null ? null : 
rawValue.toString();
+            }
 
-                final String fieldName = field.getFieldName();
-                if (value == null) {
-                    values.put(fieldName, null);
-                    continue;
-                }
+            final Optional<RecordField> optionalRecordField = 
schema.getField(fieldName);
 
+            final Object coercedValue;
+            if (coerceTypes && optionalRecordField.isPresent()) {
+                final RecordField field = optionalRecordField.get();
                 final DataType fieldType = field.getDataType();
-                final Object converted = convert(fieldType, value.toString(), 
fieldName);
-                values.put(fieldName, converted);
+                coercedValue = convert(fieldType, normalizedValue, fieldName);
+            } else {
+                coercedValue = normalizedValue;
             }
 
-            if (append && toAppend.length() > 0) {
-                final String lastFieldName = 
schema.getField(schema.getFieldCount() - 1).getFieldName();
+            converted.put(fieldName, coercedValue);
+        }
 
-                final int fieldIndex = 
STACK_TRACE_COLUMN_NAME.equals(lastFieldName) ? schema.getFieldCount() - 2 : 
schema.getFieldCount() - 1;
-                final String lastFieldBeforeStackTrace = 
schema.getFieldNames().get(fieldIndex);
+        // If there is any trailing text, determine the last column from the 
grok schema
+        // and then append the trailing text to it.
+        if (append && trailingText.length() > 0) {
+            String lastPopulatedFieldName = null;
+            final List<RecordField> schemaFields = schemaFromGrok.getFields();
+            for (int i = schemaFields.size() - 1; i >= 0; i--) {
+                final RecordField field = schemaFields.get(i);
+
+                Object value = converted.get(field.getFieldName());
+                if (value != null) {
+                    lastPopulatedFieldName = field.getFieldName();
+                    break;
+                }
 
-                final Object existingValue = 
values.get(lastFieldBeforeStackTrace);
-                final String updatedValue = existingValue == null ? 
toAppend.toString() : existingValue + toAppend.toString();
-                values.put(lastFieldBeforeStackTrace, updatedValue);
+                for (final String alias : field.getAliases()) {
+                    value = converted.get(alias);
+                    if (value != null) {
+                        lastPopulatedFieldName = alias;
+                        break;
+                    }
+                }
             }
 
-            values.put(STACK_TRACE_COLUMN_NAME, stackTrace);
-
-            return new MapRecord(schema, values);
-        } catch (final Exception e) {
-            throw new MalformedRecordException("Found invalid log record and 
will skip it. Record: " + nextLine, e);
+            if (lastPopulatedFieldName != null) {
+                final Object value = converted.get(lastPopulatedFieldName);
+                if (value == null) {
+                    converted.put(lastPopulatedFieldName, 
trailingText.toString());
+                } else if (value instanceof String) { // if not a String it is 
a List and we will just drop the trailing text
+                    converted.put(lastPopulatedFieldName, (String) value + 
trailingText.toString());
+                }
+            }
         }
+
+        converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
+        return new MapRecord(schema, converted);
     }
 
 
@@ -200,22 +232,23 @@ public class GrokRecordReader implements RecordReader {
     }
 
 
-    protected Object convert(final DataType fieldType, final String string, 
final String fieldName) {
+    protected Object convert(final DataType fieldType, final Object rawValue, 
final String fieldName) {
         if (fieldType == null) {
-            return string;
+            return rawValue;
         }
 
-        if (string == null) {
+        if (rawValue == null) {
             return null;
         }
 
         // If string is empty then return an empty string if field type is 
STRING. If field type is
         // anything else, we can't really convert it so return null
-        if (string.isEmpty() && fieldType.getFieldType() != 
RecordFieldType.STRING) {
+        final boolean fieldEmpty = rawValue instanceof String && ((String) 
rawValue).isEmpty();
+        if (fieldEmpty && fieldType.getFieldType() != RecordFieldType.STRING) {
             return null;
         }
 
-        return DataTypeUtils.convertType(string, fieldType, fieldName);
+        return DataTypeUtils.convertType(rawValue, fieldType, fieldName);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 1f61f17..663b837 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -19,19 +19,30 @@ package org.apache.nifi.json;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
 
 
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
@@ -70,8 +81,9 @@ public abstract class AbstractJsonRowRecordReader implements 
RecordReader {
         }
     }
 
+
     @Override
-    public Record nextRecord() throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
         if (firstObjectConsumed && !array) {
             return null;
         }
@@ -79,7 +91,7 @@ public abstract class AbstractJsonRowRecordReader implements 
RecordReader {
         final JsonNode nextNode = getNextJsonNode();
         final RecordSchema schema = getSchema();
         try {
-            return convertJsonNodeToRecord(nextNode, schema);
+            return convertJsonNodeToRecord(nextNode, schema, coerceTypes, 
dropUnknownFields);
         } catch (final MalformedRecordException mre) {
             throw mre;
         } catch (final IOException ioe) {
@@ -91,7 +103,11 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
     }
 
     protected Object getRawNodeValue(final JsonNode fieldNode) throws 
IOException {
-        if (fieldNode == null || !fieldNode.isValueNode()) {
+        return getRawNodeValue(fieldNode, null);
+    }
+
+    protected Object getRawNodeValue(final JsonNode fieldNode, final DataType 
dataType) throws IOException {
+        if (fieldNode == null || fieldNode.isNull()) {
             return null;
         }
 
@@ -111,6 +127,53 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
             return fieldNode.getTextValue();
         }
 
+        if (fieldNode.isArray()) {
+            final ArrayNode arrayNode = (ArrayNode) fieldNode;
+            final int numElements = arrayNode.size();
+            final Object[] arrayElements = new Object[numElements];
+            int count = 0;
+
+            final DataType elementDataType;
+            if (dataType != null && dataType.getFieldType() == 
RecordFieldType.ARRAY) {
+                final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+                elementDataType = arrayDataType.getElementType();
+            } else {
+                elementDataType = null;
+            }
+
+            for (final JsonNode node : arrayNode) {
+                final Object value = getRawNodeValue(node, elementDataType);
+                arrayElements[count++] = value;
+            }
+
+            return arrayElements;
+        }
+
+        if (fieldNode.isObject()) {
+            RecordSchema childSchema;
+            if (dataType != null && RecordFieldType.RECORD == 
dataType.getFieldType()) {
+                final RecordDataType recordDataType = (RecordDataType) 
dataType;
+                childSchema = recordDataType.getChildSchema();
+            } else {
+                childSchema = null;
+            }
+
+            if (childSchema == null) {
+                childSchema = new SimpleRecordSchema(Collections.emptyList());
+            }
+
+            final Iterator<String> fieldNames = fieldNode.getFieldNames();
+            final Map<String, Object> childValues = new HashMap<>();
+            while (fieldNames.hasNext()) {
+                final String childFieldName = fieldNames.next();
+                final Object childValue = 
getRawNodeValue(fieldNode.get(childFieldName), dataType);
+                childValues.put(childFieldName, childValue);
+            }
+
+            final MapRecord record = new MapRecord(childSchema, childValues);
+            return record;
+        }
+
         return null;
     }
 
@@ -159,5 +222,5 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         return Optional.ofNullable(firstJsonNode);
     }
 
-    protected abstract Record convertJsonNodeToRecord(final JsonNode nextNode, 
final RecordSchema schema) throws IOException, MalformedRecordException;
+    protected abstract Record convertJsonNodeToRecord(JsonNode nextNode, 
RecordSchema schema, boolean coerceTypes, boolean dropUnknownFields) throws 
IOException, MalformedRecordException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index 2110bbb..90ab799 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.nifi.json;
 import java.io.IOException;
 import java.io.InputStream;
 import java.text.DateFormat;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -29,6 +30,7 @@ import java.util.function.Supplier;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -89,7 +91,7 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
     }
 
     @Override
-    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema) throws IOException {
+    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema, final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException {
         if (jsonNode == null) {
             return null;
         }
@@ -100,7 +102,8 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
         for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) {
             final String fieldName = entry.getKey();
             final DataType desiredType = 
schema.getDataType(fieldName).orElse(null);
-            if (desiredType == null) {
+
+            if (desiredType == null && dropUnknownFields) {
                 continue;
             }
 
@@ -117,7 +120,13 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
             final Optional<RecordField> field = schema.getField(fieldName);
             final Object defaultValue = field.isPresent() ? 
field.get().getDefaultValue() : null;
 
-            value = convert(value, desiredType, fieldName, defaultValue);
+            if (coerceTypes && desiredType != null) {
+                value = convert(value, desiredType, fieldName, defaultValue);
+            } else {
+                final DataType dataType = field.isPresent() ? 
field.get().getDataType() : null;
+                value = convert(value, dataType);
+            }
+
             values.put(fieldName, value);
         }
 
@@ -126,6 +135,70 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
 
 
     @SuppressWarnings("unchecked")
+    protected Object convert(final Object value, final DataType dataType) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof List) {
+            final List<?> list = (List<?>) value;
+            final Object[] array = new Object[list.size()];
+
+            final DataType elementDataType;
+            if (dataType != null && dataType.getFieldType() == 
RecordFieldType.ARRAY) {
+                elementDataType = ((ArrayDataType) dataType).getElementType();
+            } else {
+                elementDataType = null;
+            }
+
+            int i = 0;
+            for (final Object val : list) {
+                array[i++] = convert(val, elementDataType);
+            }
+
+            return array;
+        }
+
+        if (value instanceof Map) {
+            final Map<String, ?> map = (Map<String, ?>) value;
+
+            boolean record = false;
+            for (final Object obj : map.values()) {
+                if (obj instanceof JsonNode) {
+                    record = true;
+                }
+            }
+
+            if (!record) {
+                return value;
+            }
+
+            RecordSchema childSchema = null;
+            if (dataType != null && dataType.getFieldType() == 
RecordFieldType.RECORD) {
+                childSchema = ((RecordDataType) dataType).getChildSchema();
+            }
+            if (childSchema == null) {
+                childSchema = new SimpleRecordSchema(Collections.emptyList());
+            }
+
+            final Map<String, Object> values = new HashMap<>();
+            for (final Map.Entry<String, ?> entry : map.entrySet()) {
+                final String key = entry.getKey();
+                final Object childValue = entry.getValue();
+
+                final RecordField recordField = 
childSchema.getField(key).orElse(null);
+                final DataType childDataType = recordField == null ? null : 
recordField.getDataType();
+
+                values.put(key, convert(childValue, childDataType));
+            }
+
+            return new MapRecord(childSchema, values);
+        }
+
+        return value;
+    }
+
+    @SuppressWarnings("unchecked")
     protected Object convert(final Object value, final DataType dataType, 
final String fieldName, final Object defaultValue) {
         if (value == null) {
             return defaultValue;

Reply via email to