Repository: nifi
Updated Branches:
  refs/heads/master ae940d862 -> 451f9cf12


http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index f05fe30..489d114 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -23,6 +23,7 @@ import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -69,78 +70,74 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
 
 
     @Override
-    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema) throws IOException, MalformedRecordException {
-        return convertJsonNodeToRecord(jsonNode, schema, null);
+    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields)
+        throws IOException, MalformedRecordException {
+        return convertJsonNodeToRecord(jsonNode, schema, coerceTypes, 
dropUnknownFields, null);
     }
 
-    private Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema, final String fieldNamePrefix) throws IOException, 
MalformedRecordException {
+    private Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema, final boolean coerceTypes, final boolean dropUnknown, 
final String fieldNamePrefix)
+        throws IOException, MalformedRecordException {
         if (jsonNode == null) {
             return null;
         }
 
-        final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount());
-        for (final RecordField field : schema.getFields()) {
-            final String fieldName = field.getFieldName();
+        return convertJsonNodeToRecord(jsonNode, schema, fieldNamePrefix, 
coerceTypes, dropUnknown);
+    }
 
-            final JsonNode fieldNode = getJsonNode(jsonNode, field);
 
-            final DataType desiredType = field.getDataType();
-            final String fullFieldName = fieldNamePrefix == null ? fieldName : 
fieldNamePrefix + fieldName;
-            final Object value = convertField(fieldNode, fullFieldName, 
desiredType);
-            values.put(fieldName, value);
-        }
+    private Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema, final String fieldNamePrefix,
+            final boolean coerceTypes, final boolean dropUnknown) throws 
IOException, MalformedRecordException {
 
-        final Supplier<String> supplier = () -> jsonNode.toString();
-        return new MapRecord(schema, values, SerializedForm.of(supplier, 
"application/json"));
-    }
+        final Map<String, Object> values = new LinkedHashMap<>();
+        final Iterator<String> fieldNames = jsonNode.getFieldNames();
+        while (fieldNames.hasNext()) {
+            final String fieldName = fieldNames.next();
+            final JsonNode childNode = jsonNode.get(fieldName);
 
-    private JsonNode getJsonNode(final JsonNode parent, final RecordField 
field) {
-        JsonNode fieldNode = parent.get(field.getFieldName());
-        if (fieldNode != null) {
-            return fieldNode;
-        }
+            final RecordField recordField = 
schema.getField(fieldName).orElse(null);
+            if (recordField == null && dropUnknown) {
+                continue;
+            }
 
-        for (final String alias : field.getAliases()) {
-            fieldNode = parent.get(alias);
-            if (fieldNode != null) {
-                return fieldNode;
+            final Object value;
+            if (coerceTypes && recordField != null) {
+                final DataType desiredType = recordField.getDataType();
+                final String fullFieldName = fieldNamePrefix == null ? 
fieldName : fieldNamePrefix + fieldName;
+                value = convertField(childNode, fullFieldName, desiredType, 
dropUnknown);
+            } else {
+                value = getRawNodeValue(childNode, recordField == null ? null 
: recordField.getDataType());
             }
+
+            values.put(fieldName, value);
         }
 
-        return fieldNode;
+        final Supplier<String> supplier = () -> jsonNode.toString();
+        return new MapRecord(schema, values, SerializedForm.of(supplier, 
"application/json"), false, dropUnknown);
     }
 
-    protected Object convertField(final JsonNode fieldNode, final String 
fieldName, final DataType desiredType) throws IOException, 
MalformedRecordException {
+
+    protected Object convertField(final JsonNode fieldNode, final String 
fieldName, final DataType desiredType, final boolean dropUnknown) throws 
IOException, MalformedRecordException {
         if (fieldNode == null || fieldNode.isNull()) {
             return null;
         }
 
         switch (desiredType.getFieldType()) {
             case BOOLEAN:
-                return DataTypeUtils.toBoolean(getRawNodeValue(fieldNode), 
fieldName);
             case BYTE:
-                return DataTypeUtils.toByte(getRawNodeValue(fieldNode), 
fieldName);
             case CHAR:
-                return DataTypeUtils.toCharacter(getRawNodeValue(fieldNode), 
fieldName);
             case DOUBLE:
-                return DataTypeUtils.toDouble(getRawNodeValue(fieldNode), 
fieldName);
             case FLOAT:
-                return DataTypeUtils.toFloat(getRawNodeValue(fieldNode), 
fieldName);
             case INT:
-                return DataTypeUtils.toInteger(getRawNodeValue(fieldNode), 
fieldName);
             case LONG:
-                return DataTypeUtils.toLong(getRawNodeValue(fieldNode), 
fieldName);
             case SHORT:
-                return DataTypeUtils.toShort(getRawNodeValue(fieldNode), 
fieldName);
             case STRING:
-                return DataTypeUtils.toString(getRawNodeValue(fieldNode),
-                    () -> 
DataTypeUtils.getDateFormat(desiredType.getFieldType(), LAZY_DATE_FORMAT, 
LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT));
             case DATE:
-                return DataTypeUtils.toDate(getRawNodeValue(fieldNode), 
LAZY_DATE_FORMAT, fieldName);
             case TIME:
-                return DataTypeUtils.toTime(getRawNodeValue(fieldNode), 
LAZY_TIME_FORMAT, fieldName);
-            case TIMESTAMP:
-                return DataTypeUtils.toTimestamp(getRawNodeValue(fieldNode), 
LAZY_TIMESTAMP_FORMAT, fieldName);
+            case TIMESTAMP: {
+                final Object rawValue = getRawNodeValue(fieldNode);
+                final Object converted = DataTypeUtils.convertType(rawValue, 
desiredType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, 
fieldName);
+                return converted;
+            }
             case MAP: {
                 final DataType valueType = ((MapDataType) 
desiredType).getValueType();
 
@@ -149,7 +146,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                 while (fieldNameItr.hasNext()) {
                     final String childName = fieldNameItr.next();
                     final JsonNode childNode = fieldNode.get(childName);
-                    final Object childValue = convertField(childNode, 
fieldName, valueType);
+                    final Object childValue = convertField(childNode, 
fieldName, valueType, dropUnknown);
                     map.put(childName, childValue);
                 }
 
@@ -162,7 +159,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                 int count = 0;
                 for (final JsonNode node : arrayNode) {
                     final DataType elementType = ((ArrayDataType) 
desiredType).getElementType();
-                    final Object converted = convertField(node, fieldName, 
elementType);
+                    final Object converted = convertField(node, fieldName, 
elementType, dropUnknown);
                     arrayElements[count++] = converted;
                 }
 
@@ -187,7 +184,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                         childSchema = new SimpleRecordSchema(fields);
                     }
 
-                    return convertJsonNodeToRecord(fieldNode, childSchema, 
fieldName + ".");
+                    return convertJsonNodeToRecord(fieldNode, childSchema, 
fieldName + ".", true, dropUnknown);
                 } else {
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 96b1995..8f3bf22 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -29,7 +29,9 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RawRecordWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
@@ -44,7 +46,7 @@ import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonGenerator;
 
-public class WriteJsonResult extends AbstractRecordSetWriter implements 
RecordSetWriter {
+public class WriteJsonResult extends AbstractRecordSetWriter implements 
RecordSetWriter, RawRecordWriter {
     private final ComponentLog logger;
     private final SchemaAccessWriter schemaAccess;
     private final RecordSchema recordSchema;
@@ -107,6 +109,7 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
         }
     }
 
+
     @Override
     public Map<String, String> writeRecord(final Record record) throws 
IOException {
         // If we are not writing an active record set, then we need to ensure 
that we write the
@@ -116,12 +119,25 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
             schemaAccess.writeHeader(recordSchema, getOutputStream());
         }
 
-        writeRecord(record, recordSchema, generator, g -> 
g.writeStartObject(), g -> g.writeEndObject());
+        writeRecord(record, recordSchema, generator, g -> 
g.writeStartObject(), g -> g.writeEndObject(), true);
         return schemaAccess.getAttributes(recordSchema);
     }
 
-    private void writeRecord(final Record record, final RecordSchema 
writeSchema, final JsonGenerator generator, final GeneratorTask startTask, 
final GeneratorTask endTask)
-        throws JsonGenerationException, IOException {
+    @Override
+    public WriteResult writeRawRecord(final Record record) throws IOException {
+        // If we are not writing an active record set, then we need to ensure 
that we write the
+        // schema information.
+        if (!isActiveRecordSet()) {
+            generator.flush();
+            schemaAccess.writeHeader(recordSchema, getOutputStream());
+        }
+
+        writeRecord(record, recordSchema, generator, g -> 
g.writeStartObject(), g -> g.writeEndObject(), false);
+        return WriteResult.of(incrementRecordCount(), 
schemaAccess.getAttributes(recordSchema));
+    }
+
+    private void writeRecord(final Record record, final RecordSchema 
writeSchema, final JsonGenerator generator,
+        final GeneratorTask startTask, final GeneratorTask endTask, final 
boolean schemaAware) throws JsonGenerationException, IOException {
 
         final Optional<SerializedForm> serializedForm = 
record.getSerializedForm();
         if (serializedForm.isPresent()) {
@@ -137,21 +153,36 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
 
         try {
             startTask.apply(generator);
-            for (int i = 0; i < writeSchema.getFieldCount(); i++) {
-                final RecordField field = writeSchema.getField(i);
-                final String fieldName = field.getFieldName();
-                final Object value = record.getValue(field);
-                if (value == null) {
-                    generator.writeNullField(fieldName);
-                    continue;
-                }
 
-                generator.writeFieldName(fieldName);
-                final DataType dataType = 
writeSchema.getDataType(fieldName).get();
+            if (schemaAware) {
+                for (final RecordField field : writeSchema.getFields()) {
+                    final String fieldName = field.getFieldName();
+                    final Object value = record.getValue(field);
+                    if (value == null) {
+                        generator.writeNullField(fieldName);
+                        continue;
+                    }
+
+                    generator.writeFieldName(fieldName);
 
-                writeValue(generator, value, fieldName, dataType, i < 
writeSchema.getFieldCount() - 1);
+                    final DataType dataType = 
writeSchema.getDataType(fieldName).get();
+                    writeValue(generator, value, fieldName, dataType);
+                }
+            } else {
+                for (final String fieldName : record.getRawFieldNames()) {
+                    final Object value = record.getValue(fieldName);
+                    if (value == null) {
+                        generator.writeNullField(fieldName);
+                        continue;
+                    }
+
+                    generator.writeFieldName(fieldName);
+                    writeRawValue(generator, value, fieldName);
+                }
             }
 
+
+
             endTask.apply(generator);
         } catch (final Exception e) {
             logger.error("Failed to write {} with schema {} as a JSON Object 
due to {}", new Object[] {record, record.getSchema(), e.toString(), e});
@@ -159,9 +190,51 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private void writeRawValue(final JsonGenerator generator, final Object 
value, final String fieldName)
+        throws JsonGenerationException, IOException {
+
+        if (value == null) {
+            generator.writeNull();
+            return;
+        }
+
+        if (value instanceof Record) {
+            final Record record = (Record) value;
+            writeRecord(record, record.getSchema(), generator, gen -> 
gen.writeStartObject(), gen -> gen.writeEndObject(), false);
+            return;
+        }
+
+        if (value instanceof Map) {
+            final Map<String, ?> map = (Map<String, ?>) value;
+            generator.writeStartObject();
+
+            for (final Map.Entry<String, ?> entry : map.entrySet()) {
+                final String mapKey = entry.getKey();
+                final Object mapValue = entry.getValue();
+                generator.writeFieldName(mapKey);
+                writeRawValue(generator, mapValue, fieldName + "." + mapKey);
+            }
+
+            generator.writeEndObject();
+            return;
+        }
+
+        if (value instanceof Object[]) {
+            final Object[] values = (Object[]) value;
+            generator.writeStartArray();
+            for (final Object element : values) {
+                writeRawValue(generator, element, fieldName);
+            }
+            generator.writeEndArray();
+            return;
+        }
+
+        generator.writeObject(value);
+    }
 
     @SuppressWarnings("unchecked")
-    private void writeValue(final JsonGenerator generator, final Object value, 
final String fieldName, final DataType dataType, final boolean moreCols)
+    private void writeValue(final JsonGenerator generator, final Object value, 
final String fieldName, final DataType dataType)
         throws JsonGenerationException, IOException {
         if (value == null) {
             generator.writeNull();
@@ -242,7 +315,7 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
                 final Record record = (Record) coercedValue;
                 final RecordDataType recordDataType = (RecordDataType) 
chosenDataType;
                 final RecordSchema childSchema = 
recordDataType.getChildSchema();
-                writeRecord(record, childSchema, generator, gen -> 
gen.writeStartObject(), gen -> gen.writeEndObject());
+                writeRecord(record, childSchema, generator, gen -> 
gen.writeStartObject(), gen -> gen.writeEndObject(), true);
                 break;
             }
             case MAP: {
@@ -250,12 +323,12 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
                 final DataType valueDataType = mapDataType.getValueType();
                 final Map<String, ?> map = (Map<String, ?>) coercedValue;
                 generator.writeStartObject();
-                int i = 0;
+
                 for (final Map.Entry<String, ?> entry : map.entrySet()) {
                     final String mapKey = entry.getKey();
                     final Object mapValue = entry.getValue();
                     generator.writeFieldName(mapKey);
-                    writeValue(generator, mapValue, fieldName + "." + mapKey, 
valueDataType, ++i < map.size());
+                    writeValue(generator, mapValue, fieldName + "." + mapKey, 
valueDataType);
                 }
                 generator.writeEndObject();
                 break;
@@ -278,9 +351,8 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
         throws JsonGenerationException, IOException {
         generator.writeStartArray();
         for (int i = 0; i < values.length; i++) {
-            final boolean moreEntries = i < values.length - 1;
             final Object element = values[i];
-            writeValue(generator, element, fieldName, elementType, 
moreEntries);
+            writeValue(generator, element, fieldName, elementType);
         }
         generator.writeEndArray();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html
index 5a08269..8e18934 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html
@@ -73,42 +73,44 @@
 
                <h2>Examples</h2>
                
+               <h3>Example 1</h3>
+               
         <p>
                As an example, consider a FlowFile whose contents consists of 
the following:
         </p>
 
-        <code>
-               id, name, balance, join_date, notes<br />
-               1, John, 48.23, 04/03/2007 "Our very<br />
+<code>
+id, name, balance, join_date, notes<br />
+1, John, 48.23, 04/03/2007 "Our very<br />
 first customer!"<br />
-               2, Jane, 1245.89, 08/22/2009,<br />
-               3, Frank Franklin, "48481.29", 04/04/2016,<br />
-        </code>
+2, Jane, 1245.89, 08/22/2009,<br />
+3, Frank Franklin, "48481.29", 04/04/2016,<br />
+</code>
         
         <p>
                Additionally, let's consider that this Controller Service is 
configured with the Schema Registry pointing to an AvroSchemaRegistry and the 
schema is
                configured as the following:
         </p>
         
-               <code>
-               <pre>
-               {
-                 "namespace": "nifi",
-                 "name": "balances",
-                 "type": "record",
-                 "fields": [
-                               { "name": "id", "type": "int" },
-                               { "name": "name": "type": "string" },
-                               { "name": "balance": "type": "double" },
-                               { "name": "join_date", "type": {
-                                       "type": "int",
-                                       "logicalType": "date"
-                               },
-                               { "name": "notes": "type": "string" }
-                 ]
-               }
-               </pre>
-               </code>
+<code>
+<pre>
+{
+  "namespace": "nifi",
+  "name": "balances",
+  "type": "record",
+  "fields": [
+    { "name": "id", "type": "int" },
+    { "name": "name": "type": "string" },
+    { "name": "balance": "type": "double" },
+    { "name": "join_date", "type": {
+      "type": "int",
+      "logicalType": "date"
+    }},
+    { "name": "notes": "type": "string" }
+  ]
+}
+</pre>
+</code>
 
        <p>
                In the example above, we see that the 'join_date' column is a 
Date type. In order for the CSV Reader to be able to properly parse a value as 
a date,
@@ -211,7 +213,122 @@ first customer!"<br />
                        </tr>
                </body>
        </table>
+
+
+
+       <h3>Example 2 - Schema with CSV Header Line</h3>
+
+       <p>
+               When CSV data consists of a header line that outlines the 
column names, the reader provides
+               a couple of different properties for configuring how to handle 
these column names. The
+               "Schema Access Strategy" property as well as the associated 
properties ("Schema Registry," "Schema Text," and
+               "Schema Name" properties) can be used to specify how to obtain 
the schema. If the "Schema Access Strategy" is set
+               to "Use String Fields From Header" then the header line of the 
CSV will be used to determine the schema. Otherwise,
+               a schema will be referenced elsewhere. But what happens if a 
schema is obtained from a Schema Registry, for instance,
+               and the CSV Header indicates a different set of column names?
+       </p>
        
+       <p>
+               For example, let's say that the following schema is obtained 
from the Schema Registry:
+       </p>
+
+<code>
+<pre>
+{
+  "namespace": "nifi",
+  "name": "balances",
+  "type": "record",
+  "fields": [
+    { "name": "id", "type": "int" },
+    { "name": "name": "type": "string" },
+    { "name": "balance": "type": "double" },
+    { "name": "memo": "type": "string" }
+  ]
+}
+</pre>
+</code>
+               
+               <p>
+                       And the CSV contains the following data:
+               </p>
+               
+<code>
+<pre>
+id, name, balance, notes
+1, John Doe, 123.45, First Customer
+</pre>
+</code>
+               
+               <p>
+               Note here that our schema indicates that the final column is 
named "memo" whereas the CSV Header indicates that it is named "notes."
+               </p>
        
+       <p>
+       In this case, the reader will look at the "Ignore CSV Header Column 
Names" property. If this property is set to "true" then the column names
+       provided in the CSV will simply be ignored and the last column will be 
called "memo." However, if the "Ignore CSV Header Column Names" property
+       is set to "false" then the result will be that the last column will be 
named "notes" and each record will have a null value for the "memo" column.
+       </p>
+
+               <p>
+               With "Ignore CSV Header Column Names" property set to 
"false":<br />
+               <table>
+               <head>
+                       <th>Field Name</th>
+                       <th>Field Value</th>
+               </head>
+               <body>
+                       <tr>
+                               <td>id</td>
+                               <td>1</td>
+                       </tr>
+                       <tr>
+                               <td>name</td>
+                               <td>John Doe</td>
+                       </tr>
+                       <tr>
+                               <td>balance</td>
+                               <td>123.45</td>
+                       </tr>
+                       <tr>
+                               <td>memo</td>
+                               <td>First Customer</td>
+                       </tr>
+               </body>
+       </table>
+               </p>
+               
+               
+               <p>
+               With "Ignore CSV Header Column Names" property set to 
"true":<br />
+                               <table>
+               <head>
+                       <th>Field Name</th>
+                       <th>Field Value</th>
+               </head>
+               <body>
+                       <tr>
+                               <td>id</td>
+                               <td>1</td>
+                       </tr>
+                       <tr>
+                               <td>name</td>
+                               <td>John Doe</td>
+                       </tr>
+                       <tr>
+                               <td>balance</td>
+                               <td>123.45</td>
+                       </tr>
+                       <tr>
+                               <td>notes</td>
+                               <td>First Customer</td>
+                       </tr>
+                       <tr>
+                               <td>memo</td>
+                               <td><code>null</code></td>
+                       </tr>
+               </body>
+       </table>
+               </p>
+               
     </body>
 </html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index 5c04cfa..b5fd869 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -253,8 +253,8 @@ public class TestAvroReaderWithEmbeddedSchema {
             accountValues.put("accountId", 83L);
 
             final List<RecordField> accountRecordFields = new ArrayList<>();
-            accountRecordFields.add(new RecordField("accountId", 
RecordFieldType.LONG.getDataType()));
-            accountRecordFields.add(new RecordField("accountName", 
RecordFieldType.STRING.getDataType()));
+            accountRecordFields.add(new RecordField("accountId", 
RecordFieldType.LONG.getDataType(), false));
+            accountRecordFields.add(new RecordField("accountName", 
RecordFieldType.STRING.getDataType(), false));
 
             final RecordSchema accountRecordSchema = new 
SimpleRecordSchema(accountRecordFields);
             final Record mapRecord = new MapRecord(accountRecordSchema, 
accountValues);
@@ -269,8 +269,8 @@ public class TestAvroReaderWithEmbeddedSchema {
             dogMap.put("dogTailLength", 14);
 
             final List<RecordField> dogRecordFields = new ArrayList<>();
-            dogRecordFields.add(new RecordField("dogTailLength", 
RecordFieldType.INT.getDataType()));
-            dogRecordFields.add(new RecordField("dogName", 
RecordFieldType.STRING.getDataType()));
+            dogRecordFields.add(new RecordField("dogTailLength", 
RecordFieldType.INT.getDataType(), false));
+            dogRecordFields.add(new RecordField("dogName", 
RecordFieldType.STRING.getDataType(), false));
             final RecordSchema dogRecordSchema = new 
SimpleRecordSchema(dogRecordFields);
             final Record dogRecord = new MapRecord(dogRecordSchema, dogMap);
 
@@ -281,8 +281,8 @@ public class TestAvroReaderWithEmbeddedSchema {
             catMap.put("catTailLength", 1);
 
             final List<RecordField> catRecordFields = new ArrayList<>();
-            catRecordFields.add(new RecordField("catTailLength", 
RecordFieldType.INT.getDataType()));
-            catRecordFields.add(new RecordField("catName", 
RecordFieldType.STRING.getDataType()));
+            catRecordFields.add(new RecordField("catTailLength", 
RecordFieldType.INT.getDataType(), false));
+            catRecordFields.add(new RecordField("catName", 
RecordFieldType.STRING.getDataType(), false));
             final RecordSchema catRecordSchema = new 
SimpleRecordSchema(catRecordFields);
             final Record catRecord = new MapRecord(catRecordSchema, catMap);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index a02e0b1..3533a43 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -57,6 +57,11 @@ public class TestCSVRecordReader {
         return fields;
     }
 
+    private CSVRecordReader createReader(final InputStream in, final 
RecordSchema schema) throws IOException {
+        return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), 
schema, format, true, false,
+            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
+    }
+
     @Test
     public void testDate() throws IOException, MalformedRecordException {
         final String text = "date\n11/30/1983";
@@ -66,7 +71,7 @@ public class TestCSVRecordReader {
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
         try (final InputStream bais = new 
ByteArrayInputStream(text.getBytes());
-            final CSVRecordReader reader = new CSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format,
+            final CSVRecordReader reader = new CSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format, true, false,
                 "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
 
             final Record record = reader.nextRecord();
@@ -87,9 +92,8 @@ public class TestCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/csv/single-bank-account.csv"))) {
-            final CSVRecordReader reader = new CSVRecordReader(fis, 
Mockito.mock(ComponentLog.class), schema, format,
-                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
+        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/csv/single-bank-account.csv"));
+            final CSVRecordReader reader = createReader(fis, schema)) {
 
             final Object[] record = reader.nextRecord().getValues();
             final Object[] expectedValues = new Object[] {"1", "John Doe", 
4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -106,9 +110,8 @@ public class TestCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/csv/multi-bank-account.csv"))) {
-            final CSVRecordReader reader = new CSVRecordReader(fis, 
Mockito.mock(ComponentLog.class), schema, format,
-                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
+        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/csv/multi-bank-account.csv"));
+            final CSVRecordReader reader = createReader(fis, schema)) {
 
             final Object[] firstRecord = reader.nextRecord().getValues();
             final Object[] firstExpectedValues = new Object[] {"1", "John 
Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -129,9 +132,8 @@ public class TestCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/csv/extra-white-space.csv"))) {
-            final CSVRecordReader reader = new CSVRecordReader(fis, 
Mockito.mock(ComponentLog.class), schema, format,
-                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
+        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/csv/extra-white-space.csv"));
+            final CSVRecordReader reader = createReader(fis, schema)) {
 
             final Object[] firstRecord = reader.nextRecord().getValues();
             final Object[] firstExpectedValues = new Object[] {"1", "John 
Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -157,9 +159,8 @@ public class TestCSVRecordReader {
         final String csvData = headerLine + "\n" + inputRecord;
         final byte[] inputData = csvData.getBytes();
 
-        try (final InputStream baos = new ByteArrayInputStream(inputData)) {
-            final CSVRecordReader reader = new CSVRecordReader(baos, 
Mockito.mock(ComponentLog.class), schema, format,
-                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final CSVRecordReader reader = createReader(bais, schema)) {
 
             final Record record = reader.nextRecord();
             assertNotNull(record);
@@ -176,4 +177,149 @@ public class TestCSVRecordReader {
             assertNull(reader.nextRecord());
         }
     }
+
+    @Test
+    public void testReadRawWithDifferentFieldName() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode, continent";
+        final String inputRecord = "1, John, 40.80, 123 My Street, My City, 
MS, 11111, North America";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord does not contain a 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final CSVRecordReader reader = createReader(bais, schema)) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+            assertNull(record.getValue("continent"));
+
+            assertNull(reader.nextRecord());
+        }
+
+        // test nextRawRecord does contain 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final CSVRecordReader reader = createReader(bais, schema)) {
+
+            final Record record = reader.nextRecord(false, false);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+            assertEquals("North America", record.getValue("continent"));
+
+            assertNull(reader.nextRecord(false, false));
+        }
+    }
+
+
+    @Test
+    public void testFieldInSchemaButNotHeader() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode";
+        final String inputRecord = "1, John, 40.80, 123 My Street, My City, 
MS, 11111, USA";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final CSVRecordReader reader = createReader(bais, schema)) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+
+            // If schema says that there are fields a, b, c
+            // and the CSV has a header line that says field names are a, b
+            // and then the data has values 1,2,3
+            // then a=1, b=2, c=null
+            assertNull(record.getValue("country"));
+
+            assertNull(reader.nextRecord());
+        }
+
+        // Create another Record Reader that indicates that the header line is 
present but should be ignored. This should cause
+        // our schema to be the definitive list of what fields exist.
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final CSVRecordReader reader = new CSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format, true, true,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+
+            // If schema says that there are fields a, b, c
+            // and the CSV has a header line that says field names are a, b
+            // and then the data has values 1,2,3
+            // then a=1, b=2, c=null
+            // But if we configure the reader to Ignore the header, then this 
will not occur!
+            assertEquals("USA", record.getValue("country"));
+
+            assertNull(reader.nextRecord());
+        }
+
+    }
+
+    @Test
+    public void testExtraFieldNotInHeader() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode, country";
+        final String inputRecord = "1, John, 40.80, 123 My Street, My City, 
MS, 11111, USA, North America";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord does not contain a 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final CSVRecordReader reader = createReader(bais, schema)) {
+
+            final Record record = reader.nextRecord(false, false);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertEquals("USA", record.getValue("country"));
+            assertEquals("North America", 
record.getValue("unknown_field_index_8"));
+
+            assertNull(reader.nextRecord(false, false));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index c447664..0285796 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -30,6 +30,7 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -127,6 +128,172 @@ public class TestWriteCSVResult {
         assertEquals(expectedValues, values);
     }
 
+    @Test
+    public void testExtraFieldInWriteRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", "1");
+        values.put("name", "John");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, 
schema, new SchemaNameAsAttribute(), baos,
+            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
+
+            writer.beginRecordSet();
+            writer.write(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id\n1\n", output);
+    }
+
+    @Test
+    public void testExtraFieldInWriteRawRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        values.put("name", "John");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, 
schema, new SchemaNameAsAttribute(), baos,
+            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
+
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n1,John\n", output);
+    }
+
+    @Test
+    public void testMissingFieldWriteRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, 
schema, new SchemaNameAsAttribute(), baos,
+            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
+
+            writer.beginRecordSet();
+            writer.writeRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n1,\n", output);
+    }
+
+    @Test
+    public void testMissingFieldWriteRawRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, 
schema, new SchemaNameAsAttribute(), baos,
+            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
+
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n1,\n", output);
+    }
+
+
+    @Test
+    public void testMissingAndExtraFieldWriteRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        values.put("dob", "1/1/1970");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, 
schema, new SchemaNameAsAttribute(), baos,
+            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
+
+            writer.beginRecordSet();
+            writer.writeRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n1,\n", output);
+    }
+
+    @Test
+    public void testMissingAndExtraFieldWriteRawRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        values.put("dob", "1/1/1970");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, 
schema, new SchemaNameAsAttribute(), baos,
+            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
+
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,dob,name\n1,1/1/1970,\n", output);
+    }
+
+
     private DateFormat getDateFormat(final String format) {
         final DateFormat df = new SimpleDateFormat(format);
         df.setTimeZone(TimeZone.getTimeZone("gmt"));

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index 4317728..b849c0a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -48,7 +48,7 @@ public class TestGrokRecordReader {
             
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
             grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
%{GREEDYDATA:message}");
 
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), true);
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
true);
 
             final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", 
"FATAL", "FINE"};
             final String[] messages = new String[] {"Test Message 1", "Red", 
"Green", "Blue", "Yellow"};
@@ -78,7 +78,7 @@ public class TestGrokRecordReader {
         final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election 
Notification Thread-1] o.a.n.LoggerClass \n"
             + "org.apache.nifi.exception.UnitTestException: Testing to ensure 
we are able to capture stack traces";
         final InputStream bais = new 
ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
-        final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, 
GrokReader.createRecordSchema(grok), true);
+        final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, 
GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
 
         final Object[] values = deserializer.nextRecord().getValues();
 
@@ -101,7 +101,7 @@ public class TestGrokRecordReader {
             
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
             grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
 
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), true);
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
true);
 
             final String[] logLevels = new String[] {"INFO", "INFO", "INFO", 
"WARN", "WARN"};
 
@@ -125,7 +125,7 @@ public class TestGrokRecordReader {
             
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
             grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
 
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), true);
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
true);
 
             final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", 
"WARN", "WARN"};
 
@@ -157,7 +157,7 @@ public class TestGrokRecordReader {
             
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
             grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
%{GREEDYDATA:message}");
 
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), true);
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
true);
 
             final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
             final String[] messages = new String[] {"message without stack 
trace",
@@ -211,7 +211,7 @@ public class TestGrokRecordReader {
             assertTrue(fieldNames.contains("message"));
             assertTrue(fieldNames.contains("stackTrace"));  // always 
implicitly there
 
-            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, true);
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, true);
             final Record record = deserializer.nextRecord();
 
             assertEquals("May 22 15:58:23", record.getValue("timestamp"));
@@ -248,7 +248,7 @@ public class TestGrokRecordReader {
             assertTrue(fieldNames.contains("fourth"));
             assertTrue(fieldNames.contains("fifth"));
 
-            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, false);
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, false);
             final Record record = deserializer.nextRecord();
 
             assertEquals("1", record.getValue("first"));
@@ -283,7 +283,7 @@ public class TestGrokRecordReader {
             assertTrue(fieldNames.contains("fourth"));
             assertTrue(fieldNames.contains("fifth"));
 
-            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, false);
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, false);
             for (int i = 0; i < 2; i++) {
                 final Record record = deserializer.nextRecord();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index c83d0dc..e898edd 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -162,6 +162,74 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
+    public void testReadRawRecordIncludesFieldsNotInSchema() throws 
IOException, MalformedRecordException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-array.json"));
+            final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final Record schemaValidatedRecord = reader.nextRecord();
+            assertEquals(1, schemaValidatedRecord.getValue("id"));
+            assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
+            assertNull(schemaValidatedRecord.getValue("balance"));
+        }
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-array.json"));
+            final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final Record rawRecord = reader.nextRecord(false, false);
+            assertEquals(1, rawRecord.getValue("id"));
+            assertEquals("John Doe", rawRecord.getValue("name"));
+            assertEquals(4750.89, rawRecord.getValue("balance"));
+            assertEquals("123 My Street", rawRecord.getValue("address"));
+            assertEquals("My City", rawRecord.getValue("city"));
+            assertEquals("MS", rawRecord.getValue("state"));
+            assertEquals("11111", rawRecord.getValue("zipCode"));
+            assertEquals("USA", rawRecord.getValue("country"));
+        }
+    }
+
+
+    @Test
+    public void testReadRawRecordTypeCoercion() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-array.json"));
+            final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final Record schemaValidatedRecord = reader.nextRecord();
+            assertEquals("1", schemaValidatedRecord.getValue("id")); // will 
be coerced into a STRING as per the schema
+            assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
+            assertNull(schemaValidatedRecord.getValue("balance"));
+
+            assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
+        }
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-array.json"));
+            final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final Record rawRecord = reader.nextRecord(false, false);
+            assertEquals(1, rawRecord.getValue("id")); // will return raw 
value of (int) 1
+            assertEquals("John Doe", rawRecord.getValue("name"));
+            assertEquals(4750.89, rawRecord.getValue("balance"));
+            assertEquals("123 My Street", rawRecord.getValue("address"));
+            assertEquals("My City", rawRecord.getValue("city"));
+            assertEquals("MS", rawRecord.getValue("state"));
+            assertEquals("11111", rawRecord.getValue("zipCode"));
+            assertEquals("USA", rawRecord.getValue("country"));
+
+            assertEquals(8, rawRecord.getRawFieldNames().size());
+        }
+    }
+
+
+    @Test
     public void testSingleJsonElement() throws IOException, 
MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index 16d2012..2dbf146 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -183,4 +183,162 @@ public class TestWriteJsonResult {
         final String output = new String(data, StandardCharsets.UTF_8);
         assertEquals(expected, output);
     }
+
+    @Test
+    public void testExtraFieldInWriteRecord() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", "1");
+        values.put("name", "John");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, null, null, null)) {
+            writer.beginRecordSet();
+            writer.writeRecord(record);
+            writer.finishRecordSet();
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        final String expected = "[{\"id\":\"1\"}]";
+
+        final String output = new String(data, StandardCharsets.UTF_8);
+        assertEquals(expected, output);
+    }
+
+    @Test
+    public void testExtraFieldInWriteRawRecord() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        values.put("name", "John");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, null, null, null)) {
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        final String expected = "[{\"id\":\"1\",\"name\":\"John\"}]";
+
+        final String output = new String(data, StandardCharsets.UTF_8);
+        assertEquals(expected, output);
+    }
+
+    @Test
+    public void testMissingFieldInWriteRecord() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, null, null, null)) {
+            writer.beginRecordSet();
+            writer.writeRecord(record);
+            writer.finishRecordSet();
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        final String expected = "[{\"id\":\"1\",\"name\":null}]";
+
+        final String output = new String(data, StandardCharsets.UTF_8);
+        assertEquals(expected, output);
+    }
+
+    @Test
+    public void testMissingFieldInWriteRawRecord() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, null, null, null)) {
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        final String expected = "[{\"id\":\"1\"}]";
+
+        final String output = new String(data, StandardCharsets.UTF_8);
+        assertEquals(expected, output);
+    }
+
+    @Test
+    public void testMissingAndExtraFieldInWriteRecord() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        values.put("dob", "1/1/1970");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, null, null, null)) {
+            writer.beginRecordSet();
+            writer.writeRecord(record);
+            writer.finishRecordSet();
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        final String expected = "[{\"id\":\"1\",\"name\":null}]";
+
+        final String output = new String(data, StandardCharsets.UTF_8);
+        assertEquals(expected, output);
+    }
+
+    @Test
+    public void testMissingAndExtraFieldInWriteRawRecord() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        values.put("dob", "1/1/1970");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, null, null, null)) {
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        final String expected = "[{\"id\":\"1\",\"dob\":\"1/1/1970\"}]";
+
+        final String output = new String(data, StandardCharsets.UTF_8);
+        assertEquals(expected, output);
+    }
 }

Reply via email to