http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 79c602d..e0eb813 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -23,6 +23,8 @@ import java.io.OutputStreamWriter;
 import java.util.Collections;
 import java.util.Optional;
 
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
@@ -31,14 +33,14 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.stream.io.NonCloseableOutputStream;
 
-import au.com.bytecode.opencsv.CSVWriter;
-
 public class WriteCSVResult implements RecordSetWriter {
+    private final CSVFormat csvFormat;
     private final String dateFormat;
     private final String timeFormat;
     private final String timestampFormat;
 
-    public WriteCSVResult(final String dateFormat, final String timeFormat, 
final String timestampFormat) {
+    public WriteCSVResult(final CSVFormat csvFormat, final String dateFormat, 
final String timeFormat, final String timestampFormat) {
+        this.csvFormat = csvFormat;
         this.dateFormat = dateFormat;
         this.timeFormat = timeFormat;
         this.timestampFormat = timestampFormat;
@@ -66,24 +68,25 @@ public class WriteCSVResult implements RecordSetWriter {
     @Override
     public WriteResult write(final RecordSet rs, final OutputStream rawOut) 
throws IOException {
         int count = 0;
+
+        final RecordSchema schema = rs.getSchema();
+        final String[] columnNames = schema.getFieldNames().toArray(new 
String[0]);
+        final CSVFormat formatWithHeader = csvFormat.withHeader(columnNames);
+
         try (final OutputStream nonCloseable = new 
NonCloseableOutputStream(rawOut);
             final OutputStreamWriter streamWriter = new 
OutputStreamWriter(nonCloseable);
-            final CSVWriter writer = new CSVWriter(streamWriter)) {
+            final CSVPrinter printer = new CSVPrinter(streamWriter, 
formatWithHeader)) {
 
             try {
-                final RecordSchema schema = rs.getSchema();
-                final String[] columnNames = 
schema.getFieldNames().toArray(new String[0]);
-                writer.writeNext(columnNames);
-
                 Record record;
                 while ((record = rs.next()) != null) {
-                    final String[] colVals = new 
String[schema.getFieldCount()];
+                    final Object[] colVals = new 
Object[schema.getFieldCount()];
                     int i = 0;
                     for (final String fieldName : schema.getFieldNames()) {
                         colVals[i++] = record.getAsString(fieldName, 
getFormat(record, fieldName));
                     }
 
-                    writer.writeNext(colVals);
+                    printer.printRecord(colVals);
                     count++;
                 }
             } catch (final Exception e) {
@@ -96,22 +99,20 @@ public class WriteCSVResult implements RecordSetWriter {
 
     @Override
     public WriteResult write(final Record record, final OutputStream rawOut) 
throws IOException {
+
         try (final OutputStream nonCloseable = new 
NonCloseableOutputStream(rawOut);
             final OutputStreamWriter streamWriter = new 
OutputStreamWriter(nonCloseable);
-            final CSVWriter writer = new CSVWriter(streamWriter)) {
+            final CSVPrinter printer = new CSVPrinter(streamWriter, 
csvFormat)) {
 
             try {
                 final RecordSchema schema = record.getSchema();
-                final String[] columnNames = 
schema.getFieldNames().toArray(new String[0]);
-                writer.writeNext(columnNames);
-
-                final String[] colVals = new String[schema.getFieldCount()];
+                final Object[] colVals = new Object[schema.getFieldCount()];
                 int i = 0;
                 for (final String fieldName : schema.getFieldNames()) {
                     colVals[i++] = record.getAsString(fieldName, 
getFormat(record, fieldName));
                 }
 
-                writer.writeNext(colVals);
+                printer.printRecord(colVals);
             } catch (final Exception e) {
                 throw new IOException("Failed to serialize results", e);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index f72d5d5..f444b8a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -29,11 +29,13 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.UserTypeOverrideRowReader;
+import org.apache.nifi.serialization.SchemaRegistryRecordReader;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 import io.thekraken.grok.api.Grok;
 import io.thekraken.grok.api.exception.GrokException;
@@ -45,9 +47,11 @@ import io.thekraken.grok.api.exception.GrokException;
     + "If a line in the input does not match the expected message pattern, the 
line of text is considered to be part of the previous "
     + "message, with the exception of stack traces. A stack trace that is 
found at the end of a log message is considered to be part "
     + "of the previous message but is added to the 'STACK_TRACE' field of the 
Record. If a record has no stack trace, it will have a NULL value "
-    + "for the STACK_TRACE field.")
-public class GrokReader extends UserTypeOverrideRowReader implements 
RowRecordReaderFactory {
+    + "for the STACK_TRACE field. All fields that are parsed are considered to 
be of type String by default. If there is need to change the type of a field, "
+    + "this can be accomplished by configuring the Schema Registry to use and 
adding the appropriate schema.")
+public class GrokReader extends SchemaRegistryRecordReader implements 
RowRecordReaderFactory {
     private volatile Grok grok;
+    private volatile boolean useSchemaRegistry;
 
     private static final String DEFAULT_PATTERN_NAME = 
"/default-grok-patterns.txt";
 
@@ -60,6 +64,7 @@ public class GrokReader extends UserTypeOverrideRowReader 
implements RowRecordRe
         .expressionLanguageSupported(true)
         .required(false)
         .build();
+
     static final PropertyDescriptor GROK_EXPRESSION = new 
PropertyDescriptor.Builder()
         .name("Grok Expression")
         .description("Specifies the format of a log line in Grok format. This 
allows the Record Reader to understand how to parse each log line. "
@@ -70,7 +75,7 @@ public class GrokReader extends UserTypeOverrideRowReader 
implements RowRecordRe
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(PATTERN_FILE);
         properties.add(GROK_EXPRESSION);
         return properties;
@@ -86,14 +91,21 @@ public class GrokReader extends UserTypeOverrideRowReader 
implements RowRecordRe
         }
 
         if (context.getProperty(PATTERN_FILE).isSet()) {
-            
grok.addPatternFromFile(context.getProperty(PATTERN_FILE).getValue());
+            
grok.addPatternFromFile(context.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue());
         }
 
         grok.compile(context.getProperty(GROK_EXPRESSION).getValue());
+        useSchemaRegistry = context.getProperty(OPTIONAL_SCHEMA_NAME).isSet() 
&& context.getProperty(OPTIONAL_SCHEMA_REGISTRY).isSet();
+    }
+
+    @Override
+    protected boolean isSchemaRequired() {
+        return false;
     }
 
     @Override
-    public RecordReader createRecordReader(final InputStream in, final 
ComponentLog logger) throws IOException {
-        return new GrokRecordReader(in, grok, getFieldTypeOverrides());
+    public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws IOException {
+        final RecordSchema schema = useSchemaRegistry ? getSchema(flowFile) : 
null;
+        return new GrokRecordReader(in, grok, schema);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index bdf12f9..458dbd8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -50,10 +50,9 @@ import io.thekraken.grok.api.Match;
 public class GrokRecordReader implements RecordReader {
     private final BufferedReader reader;
     private final Grok grok;
-    private final Map<String, DataType> fieldTypeOverrides;
+    private RecordSchema schema;
 
     private String nextLine;
-    private RecordSchema schema;
 
     static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
     private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
@@ -74,10 +73,10 @@ public class GrokRecordReader implements RecordReader {
         TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd 
HH:mm:ss", gmt);
     }
 
-    public GrokRecordReader(final InputStream in, final Grok grok, final 
Map<String, DataType> fieldTypeOverrides) {
+    public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema) {
         this.reader = new BufferedReader(new InputStreamReader(in));
         this.grok = grok;
-        this.fieldTypeOverrides = fieldTypeOverrides;
+        this.schema = schema;
     }
 
     @Override
@@ -210,46 +209,33 @@ public class GrokRecordReader implements RecordReader {
         if (fieldType == null) {
             return string;
         }
+
+        if (string == null) {
+            return null;
+        }
+
+        // If string is empty then return an empty string if field type is 
STRING. If field type is
+        // anything else, we can't really convert it so return null
+        if (string.isEmpty() && fieldType.getFieldType() != 
RecordFieldType.STRING) {
+            return null;
+        }
+
         switch (fieldType.getFieldType()) {
             case BOOLEAN:
-                if (string.length() == 0) {
-                    return null;
-                }
                 return Boolean.parseBoolean(string);
             case BYTE:
-                if (string.length() == 0) {
-                    return null;
-                }
                 return Byte.parseByte(string);
             case SHORT:
-                if (string.length() == 0) {
-                    return null;
-                }
                 return Short.parseShort(string);
             case INT:
-                if (string.length() == 0) {
-                    return null;
-                }
                 return Integer.parseInt(string);
             case LONG:
-                if (string.length() == 0) {
-                    return null;
-                }
                 return Long.parseLong(string);
             case FLOAT:
-                if (string.length() == 0) {
-                    return null;
-                }
                 return Float.parseFloat(string);
             case DOUBLE:
-                if (string.length() == 0) {
-                    return null;
-                }
                 return Double.parseDouble(string);
             case DATE:
-                if (string.length() == 0) {
-                    return null;
-                }
                 try {
                     Date date = TIME_FORMAT_DATE.parse(string);
                     return new java.sql.Date(date.getTime());
@@ -257,9 +243,6 @@ public class GrokRecordReader implements RecordReader {
                     return null;
                 }
             case TIME:
-                if (string.length() == 0) {
-                    return null;
-                }
                 try {
                     Date date = TIME_FORMAT_TIME.parse(string);
                     return new java.sql.Time(date.getTime());
@@ -267,9 +250,6 @@ public class GrokRecordReader implements RecordReader {
                     return null;
                 }
             case TIMESTAMP:
-                if (string.length() == 0) {
-                    return null;
-                }
                 try {
                     Date date = TIME_FORMAT_TIMESTAMP.parse(string);
                     return new java.sql.Timestamp(date.getTime());
@@ -298,11 +278,7 @@ public class GrokRecordReader implements RecordReader {
                 final Map<String, String> namedGroups = 
GrokUtils.namedGroups(matcher, grokExpression);
                 final String fieldName = namedGroups.get("subname");
 
-                DataType dataType = fieldTypeOverrides.get(fieldName);
-                if (dataType == null) {
-                    dataType = RecordFieldType.STRING.getDataType();
-                }
-
+                DataType dataType = RecordFieldType.STRING.getDataType();
                 final RecordField recordField = new RecordField(fieldName, 
dataType);
                 fields.add(recordField);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 286326a..ad04912 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -19,16 +19,11 @@ package org.apache.nifi.json;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.TimeZone;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
@@ -45,7 +40,6 @@ import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ArrayNode;
 
 
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
@@ -57,8 +51,6 @@ public abstract class AbstractJsonRowRecordReader implements 
RecordReader {
 
     private boolean firstObjectConsumed = false;
 
-    private static final TimeZone gmt = TimeZone.getTimeZone("GMT");
-
 
     public AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger) throws IOException, MalformedRecordException {
         this.logger = logger;
@@ -136,7 +128,7 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         }
 
         final RecordSchema childSchema = determineSchema(node);
-        return RecordFieldType.RECORD.getDataType(childSchema);
+        return RecordFieldType.RECORD.getRecordDataType(childSchema);
     }
 
     protected RecordSchema determineSchema(final JsonNode jsonNode) {
@@ -155,111 +147,31 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         return new SimpleRecordSchema(recordFields);
     }
 
-    protected Object convertField(final JsonNode fieldNode, final String 
fieldName, final DataType desiredType) throws IOException, 
MalformedRecordException {
-        if (fieldNode == null || fieldNode.isNull()) {
+    protected Object getRawNodeValue(final JsonNode fieldNode) throws 
IOException {
+        if (fieldNode == null || !fieldNode.isValueNode()) {
             return null;
         }
 
-        switch (desiredType.getFieldType()) {
-            case BOOLEAN:
-                return fieldNode.asBoolean();
-            case BYTE:
-                return (byte) fieldNode.asInt();
-            case CHAR:
-                final String text = fieldNode.asText();
-                if (text.isEmpty()) {
-                    return null;
-                }
-                return text.charAt(0);
-            case DOUBLE:
-                return fieldNode.asDouble();
-            case FLOAT:
-                return (float) fieldNode.asDouble();
-            case INT:
-                return fieldNode.asInt();
-            case LONG:
-                return fieldNode.asLong();
-            case SHORT:
-                return (short) fieldNode.asInt();
-            case STRING:
-                return fieldNode.asText();
-            case DATE: {
-                final String string = fieldNode.asText();
-                if (string.isEmpty()) {
-                    return null;
-                }
-
-                try {
-                    final DateFormat dateFormat = new 
SimpleDateFormat(desiredType.getFormat());
-                    dateFormat.setTimeZone(gmt);
-                    final Date date = dateFormat.parse(string);
-                    return new java.sql.Date(date.getTime());
-                } catch (ParseException e) {
-                    logger.warn("Failed to convert JSON field to Date for 
field {} (value {})", new Object[] {fieldName, string, e});
-                    return null;
-                }
-            }
-            case TIME: {
-                final String string = fieldNode.asText();
-                if (string.isEmpty()) {
-                    return null;
-                }
-
-                try {
-                    final DateFormat dateFormat = new 
SimpleDateFormat(desiredType.getFormat());
-                    dateFormat.setTimeZone(gmt);
-                    final Date date = dateFormat.parse(string);
-                    return new java.sql.Date(date.getTime());
-                } catch (ParseException e) {
-                    logger.warn("Failed to convert JSON field to Time for 
field {} (value {})", new Object[] {fieldName, string, e});
-                    return null;
-                }
-            }
-            case TIMESTAMP: {
-                final String string = fieldNode.asText();
-                if (string.isEmpty()) {
-                    return null;
-                }
-
-                try {
-                    final DateFormat dateFormat = new 
SimpleDateFormat(desiredType.getFormat());
-                    dateFormat.setTimeZone(gmt);
-                    final Date date = dateFormat.parse(string);
-                    return new java.sql.Date(date.getTime());
-                } catch (ParseException e) {
-                    logger.warn("Failed to convert JSON field to Timestamp for 
field {} (value {})", new Object[] {fieldName, string, e});
-                    return null;
-                }
-            }
-            case ARRAY: {
-                final ArrayNode arrayNode = (ArrayNode) fieldNode;
-                final int numElements = arrayNode.size();
-                final Object[] arrayElements = new Object[numElements];
-                int count = 0;
-                for (final JsonNode node : arrayNode) {
-                    final Object converted = convertField(node, fieldName, 
determineFieldType(node));
-                    arrayElements[count++] = converted;
-                }
-
-                return arrayElements;
-            }
-            case RECORD: {
-                if (fieldNode.isObject()) {
-                    final Optional<RecordSchema> childSchema = 
desiredType.getChildRecordSchema();
-                    if (!childSchema.isPresent()) {
-                        return null;
-                    }
-
-                    return convertJsonNodeToRecord(fieldNode, 
childSchema.get());
-                } else {
-                    return fieldNode.toString();
-                }
-            }
+        if (fieldNode.isNumber()) {
+            return fieldNode.getNumberValue();
+        }
+
+        if (fieldNode.isBinary()) {
+            return fieldNode.getBinaryValue();
+        }
+
+        if (fieldNode.isBoolean()) {
+            return fieldNode.getBooleanValue();
+        }
+
+        if (fieldNode.isTextual()) {
+            return fieldNode.getTextValue();
         }
 
-        return fieldNode.toString();
+        return null;
     }
 
+
     private JsonNode getNextJsonNode() throws JsonParseException, IOException, 
MalformedRecordException {
         if (!firstObjectConsumed) {
             firstObjectConsumed = true;
@@ -286,6 +198,7 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         }
     }
 
+
     @Override
     public void close() throws IOException {
         jsonParser.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
index b43b1c1..467ecf8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
@@ -19,12 +19,11 @@ package org.apache.nifi.json;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.List;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -34,13 +33,15 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.SchemaRegistryRecordReader;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 import com.jayway.jsonpath.JsonPath;
 
@@ -50,16 +51,27 @@ import com.jayway.jsonpath.JsonPath;
     + "User-defined properties define the fields that should be extracted from 
the JSON in order to form the fields of a Record. Any JSON field "
     + "that is not extracted via a JSONPath will not be returned in the JSON 
Records.")
 @SeeAlso(JsonTreeReader.class)
-@DynamicProperty(name = "The field name for the record. If it is desirable to 
enforce that the value be coerced into a given type, its type can be included "
-    + "in the name by using a syntax of <field name>:<field type>. For 
example, \"balance:double\".",
+@DynamicProperty(name = "The field name for the record.",
     value="A JSONPath Expression that will be evaluated against each JSON 
record. The result of the JSONPath will be the value of the "
         + "field whose name is the same as the property name.",
     description="User-defined properties identifiy how to extract specific 
fields from a JSON object in order to create a Record",
     supportsExpressionLanguage=false)
-public class JsonPathReader extends AbstractControllerService implements 
RowRecordReaderFactory {
+public class JsonPathReader extends SchemaRegistryRecordReader implements 
RowRecordReaderFactory {
 
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
     private volatile LinkedHashMap<String, JsonPath> jsonPaths;
-    private volatile Map<String, DataType> fieldTypeOverrides;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(DateTimeUtils.DATE_FORMAT);
+        properties.add(DateTimeUtils.TIME_FORMAT);
+        properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
+        return properties;
+    }
+
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
@@ -74,7 +86,9 @@ public class JsonPathReader extends AbstractControllerService 
implements RowReco
 
     @OnEnabled
     public void compileJsonPaths(final ConfigurationContext context) {
-        final Map<String, DataType> fieldTypes = new 
HashMap<>(context.getProperties().size());
+        this.dateFormat = 
context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
+        this.timeFormat = 
context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
+        this.timestampFormat = 
context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
 
         final LinkedHashMap<String, JsonPath> compiled = new LinkedHashMap<>();
         for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
@@ -82,19 +96,13 @@ public class JsonPathReader extends 
AbstractControllerService implements RowReco
                 continue;
             }
 
-            final String fieldName = 
PropertyNameUtil.getFieldName(descriptor.getName());
-            final Optional<DataType> dataTypeOption = 
PropertyNameUtil.getDataType(descriptor.getName());
-            if (dataTypeOption.isPresent()) {
-                fieldTypes.put(fieldName, dataTypeOption.get());
-            }
-
+            final String fieldName = descriptor.getName();
             final String expression = 
context.getProperty(descriptor).getValue();
             final JsonPath jsonPath = JsonPath.compile(expression);
             compiled.put(fieldName, jsonPath);
         }
 
         jsonPaths = compiled;
-        fieldTypeOverrides = fieldTypes;
     }
 
     @Override
@@ -119,8 +127,9 @@ public class JsonPathReader extends 
AbstractControllerService implements RowReco
     }
 
     @Override
-    public RecordReader createRecordReader(final InputStream in, final 
ComponentLog logger) throws IOException, MalformedRecordException {
-        return new JsonPathRowRecordReader(jsonPaths, fieldTypeOverrides, in, 
logger);
+    public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws IOException, 
MalformedRecordException {
+        final RecordSchema schema = getSchema(flowFile);
+        return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, 
dateFormat, timeFormat, timestampFormat);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index 9654b97..a0f3c32 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -19,30 +19,23 @@ package org.apache.nifi.json;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.stream.Collectors;
 
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.DataTypeUtils;
 import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.codehaus.jackson.JsonNode;
 
 import com.jayway.jsonpath.Configuration;
@@ -54,23 +47,27 @@ import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
 public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
     private static final Configuration STRICT_PROVIDER_CONFIGURATION = 
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
 
-    private static final String TIME_FORMAT_DATE = "yyyy-MM-dd";
-    private static final String TIME_FORMAT_TIME = "HH:mm:ss";
-    private static final String TIME_FORMAT_TIMESTAMP = "yyyy-MM-dd HH:mm:ss";
-    private static final TimeZone gmt = TimeZone.getTimeZone("GMT");
-
+    private final ComponentLog logger;
     private final LinkedHashMap<String, JsonPath> jsonPaths;
-    private final Map<String, DataType> fieldTypeOverrides;
     private final InputStream in;
     private RecordSchema schema;
+    private final String dateFormat;
+    private final String timeFormat;
+    private final String timestampFormat;
 
-    public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> 
jsonPaths, final Map<String, DataType> fieldTypeOverrides, final InputStream 
in, final ComponentLog logger)
+    public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> 
jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog 
logger,
+        final String dateFormat, final String timeFormat, final String 
timestampFormat)
         throws MalformedRecordException, IOException {
         super(in, logger);
 
+        this.dateFormat = dateFormat;
+        this.timeFormat = timeFormat;
+        this.timestampFormat = timestampFormat;
+
+        this.schema = schema;
         this.jsonPaths = jsonPaths;
-        this.fieldTypeOverrides = fieldTypeOverrides;
         this.in = in;
+        this.logger = logger;
     }
 
     @Override
@@ -80,60 +77,10 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
 
     @Override
     public RecordSchema getSchema() {
-        if (schema != null) {
-            return schema;
-        }
-
-        final Optional<JsonNode> firstNodeOption = getFirstJsonNode();
-
-        final List<RecordField> recordFields = new ArrayList<>();
-        if (firstNodeOption.isPresent()) {
-            final DocumentContext ctx = 
JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(firstNodeOption.get().toString());
-            for (final Map.Entry<String, JsonPath> entry : 
jsonPaths.entrySet()) {
-                final String fieldName = 
PropertyNameUtil.getFieldName(entry.getKey());
-                final JsonPath jsonPath = entry.getValue();
-
-                final DataType dataType;
-                final DataType dataTypeOverride = 
fieldTypeOverrides.get(fieldName);
-                if (dataTypeOverride == null) {
-                    Object value;
-                    try {
-                        value = ctx.read(jsonPath);
-                    } catch (final PathNotFoundException pnfe) {
-                        value = null;
-                    }
-
-                    if (value == null) {
-                        dataType = RecordFieldType.STRING.getDataType();
-                    } else {
-                        dataType = DataTypeUtils.inferDataType(value);
-                    }
-                } else {
-                    dataType = dataTypeOverride;
-                }
-
-                recordFields.add(new RecordField(fieldName, dataType));
-            }
-        }
-
-        // If there are any overridden field types that we didn't find, add as 
the last fields.
-        final Set<String> knownFieldNames = recordFields.stream()
-            .map(f -> f.getFieldName())
-            .collect(Collectors.toSet());
-
-        for (final Map.Entry<String, DataType> entry : 
fieldTypeOverrides.entrySet()) {
-            if (!knownFieldNames.contains(entry.getKey())) {
-                recordFields.add(new RecordField(entry.getKey(), 
entry.getValue()));
-            }
-        }
-
-        schema = new SimpleRecordSchema(recordFields);
         return schema;
     }
 
-
     @Override
-    @SuppressWarnings("unchecked")
     protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema) throws IOException {
         if (jsonNode == null) {
             return null;
@@ -143,138 +90,72 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
         final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount());
 
         for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) {
+            final String fieldName = entry.getKey();
+            final DataType desiredType = 
schema.getDataType(fieldName).orElse(null);
+            if (desiredType == null) {
+                continue;
+            }
+
             final JsonPath jsonPath = entry.getValue();
 
             Object value;
             try {
                 value = ctx.read(jsonPath);
             } catch (final PathNotFoundException pnfe) {
+                logger.debug("Evaluated JSONPath Expression {} but the path 
was not found; will use a null value", new Object[] {entry.getValue()});
                 value = null;
             }
 
-            final String fieldName = entry.getKey();
-            if (value != null) {
-                final DataType determinedType = 
DataTypeUtils.inferDataType(value);
-                final DataType desiredType = 
schema.getDataType(fieldName).orElse(null);
-
-                if (value instanceof List) {
-                    value = ((List<Object>) value).toArray();
-                } else if (value instanceof Map && desiredType.getFieldType() 
== RecordFieldType.RECORD) {
-                    value = convert(desiredType, value);
-                } else if (desiredType != null && 
!determinedType.equals(desiredType) && shouldConvert(value, 
determinedType.getFieldType())) {
-                    value = convert(desiredType, value);
-                }
-            }
-
+            value = convert(value, desiredType);
             values.put(fieldName, value);
         }
 
         return new MapRecord(schema, values);
     }
 
-    private boolean shouldConvert(final Object value, final RecordFieldType 
determinedType) {
-        return determinedType != null
-            && determinedType != RecordFieldType.ARRAY;
-    }
-
 
-    protected Object convert(final DataType dataType, final Object value) {
-        if (dataType.getFieldType() == RecordFieldType.RECORD && 
dataType.getChildRecordSchema().isPresent() && value instanceof Map) {
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> map = (Map<String, Object>) value;
-            return new MapRecord(dataType.getChildRecordSchema().get(), map);
-        } else {
-            return convertString(dataType, value.toString());
+    @SuppressWarnings("unchecked")
+    protected Object convert(final Object value, final DataType dataType) {
+        if (value == null) {
+            return null;
         }
-    }
 
-    /**
-     * Coerces the given string into the provided data type, if possible
-     *
-     * @param dataType the desired type
-     * @param string the string representation of the value
-     * @return an Object representing the same value as the given string but 
in the requested data type
-     */
-    protected Object convertString(final DataType dataType, final String 
string) {
-        if (dataType == null) {
-            return string;
+        if (value instanceof List) {
+            if (dataType.getFieldType() != RecordFieldType.ARRAY) {
+                throw new IllegalTypeConversionException("Cannot convert value 
[" + value + "] of type Array to " + dataType);
+            }
+
+            final ArrayDataType arrayType = (ArrayDataType) dataType;
+
+            final List<?> list = (List<?>) value;
+            final Object[] coercedValues = new Object[list.size()];
+            int i = 0;
+            for (final Object rawValue : list) {
+                coercedValues[i++] = DataTypeUtils.convertType(rawValue, 
arrayType.getElementType(), dateFormat, timeFormat, timestampFormat);
+            }
+            return coercedValues;
         }
 
-        switch (dataType.getFieldType()) {
-            case BOOLEAN:
-                if (string.length() == 0) {
-                    return null;
-                }
-                return Boolean.parseBoolean(string);
-            case BYTE:
-                if (string.length() == 0) {
-                    return null;
-                }
-                return Byte.parseByte(string);
-            case SHORT:
-                if (string.length() == 0) {
-                    return null;
-                }
-                return Short.parseShort(string);
-            case INT:
-                if (string.length() == 0) {
-                    return null;
-                }
-                return Integer.parseInt(string);
-            case LONG:
-                if (string.length() == 0) {
-                    return null;
-                }
-                return Long.parseLong(string);
-            case FLOAT:
-                if (string.length() == 0) {
-                    return null;
-                }
-                return Float.parseFloat(string);
-            case DOUBLE:
-                if (string.length() == 0) {
-                    return null;
-                }
-                return Double.parseDouble(string);
-            case DATE:
-                if (string.length() == 0) {
-                    return null;
-                }
-                try {
-                    final DateFormat format = new 
SimpleDateFormat(TIME_FORMAT_DATE);
-                    format.setTimeZone(gmt);
-                    Date date = format.parse(string);
-                    return new java.sql.Date(date.getTime());
-                } catch (ParseException e) {
-                    return null;
-                }
-            case TIME:
-                if (string.length() == 0) {
-                    return null;
-                }
-                try {
-                    final DateFormat format = new 
SimpleDateFormat(TIME_FORMAT_TIME);
-                    format.setTimeZone(gmt);
-                    Date date = format.parse(string);
-                    return new java.sql.Time(date.getTime());
-                } catch (ParseException e) {
-                    return null;
-                }
-            case TIMESTAMP:
-                if (string.length() == 0) {
-                    return null;
-                }
-                try {
-                    final DateFormat format = new 
SimpleDateFormat(TIME_FORMAT_TIMESTAMP);
-                    format.setTimeZone(gmt);
-                    Date date = format.parse(string);
-                    return new java.sql.Timestamp(date.getTime());
-                } catch (ParseException e) {
-                    return null;
+        if (dataType.getFieldType() == RecordFieldType.RECORD && value 
instanceof Map) {
+            final RecordDataType recordDataType = (RecordDataType) dataType;
+            final RecordSchema childSchema = recordDataType.getChildSchema();
+
+            final Map<String, Object> rawValues = (Map<String, Object>) value;
+            final Map<String, Object> coercedValues = new HashMap<>();
+
+            for (final Map.Entry<String, Object> entry : rawValues.entrySet()) 
{
+                final String key = entry.getKey();
+                final Optional<DataType> desiredTypeOption = 
childSchema.getDataType(key);
+                if (desiredTypeOption.isPresent()) {
+                    final Object coercedValue = 
DataTypeUtils.convertType(entry.getValue(), desiredTypeOption.get(), 
dateFormat, timeFormat, timestampFormat);
+                    coercedValues.put(key, coercedValue);
                 }
-            case STRING:
-            default:
-                return string;
+            }
+
+            return new MapRecord(childSchema, coercedValues);
+        } else {
+            return DataTypeUtils.convertType(value, dataType, dateFormat, 
timeFormat, timestampFormat);
         }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
index 626f56c..f28c43d 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
@@ -27,18 +27,6 @@ public class JsonPathValidator implements Validator {
 
     @Override
     public ValidationResult validate(final String subject, final String input, 
final ValidationContext context) {
-        if (PropertyNameUtil.hasFieldType(subject) && 
!PropertyNameUtil.isFieldTypeValid(subject)) {
-            final String fieldType = 
PropertyNameUtil.getFieldTypeName(subject).get();
-
-            return new ValidationResult.Builder()
-                .subject(subject)
-                .input(input)
-                .valid(false)
-                .explanation("Invalid field type. If property name contains a 
colon (:) it must use syntax of "
-                    + "<field name>:<field type> but the specified field type 
('" + fieldType + "') is not a valid field type")
-                .build();
-        }
-
         try {
             JsonPath.compile(input);
         } catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index dc75a51..d09f135 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -26,14 +26,14 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 
-@Tags({"json", "resultset", "writer", "serialize", "record", "row"})
-@CapabilityDescription("Writes the results of a Database ResultSet as a JSON 
Array. Even if the ResultSet "
+@Tags({"json", "resultset", "writer", "serialize", "record", "recordset", 
"row"})
+@CapabilityDescription("Writes the results of a RecordSet as a JSON Array. 
Even if the RecordSet "
     + "consists of a single row, it will be written as an array with a single 
element.")
-public class JsonRecordSetWriter extends AbstractRecordSetWriter implements 
RecordSetWriterFactory {
+public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter 
implements RecordSetWriterFactory {
 
     static final PropertyDescriptor PRETTY_PRINT_JSON = new 
PropertyDescriptor.Builder()
         .name("Pretty Print JSON")

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index 2d7072a..1abb1f4 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -19,38 +19,56 @@ package org.apache.nifi.json;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.UserTypeOverrideRowReader;
+import org.apache.nifi.serialization.SchemaRegistryRecordReader;
 
 @Tags({"json", "tree", "record", "reader", "parser"})
 @CapabilityDescription("Parses JSON into individual Record objects. The Record 
that is produced will contain all top-level "
-    + "elements of the corresponding JSON Object. If the JSON has nested 
arrays, those values will be represented as an Object array for that field. "
-    + "Nested JSON objects will be represented as a Map. "
+    + "elements of the corresponding JSON Object. "
     + "The root JSON element can be either a single element or an array of 
JSON elements, and each "
-    + "element in that array will be treated as a separate record. If any of 
the elements has a nested array or a nested "
-    + "element, they will be returned as OBJECT or ARRAY types (respectively), 
not flattened out into individual fields. "
-    + "The schema for the record is determined by the first JSON element in 
the array, if the incoming FlowFile is a JSON array. "
-    + "This means that if a field does not exist in the first JSON object, 
then it will be skipped in all subsequent JSON objects. "
-    + "The data type of a field can be overridden by adding a property to "
-    + "the controller service where the name of the property matches the JSON 
field name and the value of the property is "
-    + "the data type to use. If that field does not exist in a JSON element, 
the field will be assumed to be null. "
-    + "See the Usage of the Controller Service for more information.")
+    + "element in that array will be treated as a separate record. "
+    + "If the schema that is configured contains a field that is not present 
in the JSON, a null value will be used. If the JSON contains "
+    + "a field that is not present in the schema, that field will be skipped. "
+    + "See the Usage of the Controller Service for more information and 
examples.")
 @SeeAlso(JsonPathReader.class)
-@DynamicProperty(name = "<name of JSON field>", value = "<data type of JSON 
field>",
-    description = "User-defined properties are used to indicate that the 
values of a specific field should be interpreted as a "
-    + "user-defined data type (e.g., int, double, float, date, etc.)", 
supportsExpressionLanguage = false)
-public class JsonTreeReader extends UserTypeOverrideRowReader implements 
RowRecordReaderFactory {
+public class JsonTreeReader extends SchemaRegistryRecordReader implements 
RowRecordReaderFactory {
+
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(DateTimeUtils.DATE_FORMAT);
+        properties.add(DateTimeUtils.TIME_FORMAT);
+        properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
+        return properties;
+    }
+
+    @OnEnabled
+    public void storeFormats(final ConfigurationContext context) {
+        this.dateFormat = 
context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
+        this.timeFormat = 
context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
+        this.timestampFormat = 
context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+    }
 
     @Override
-    public RecordReader createRecordReader(final InputStream in, final 
ComponentLog logger) throws IOException, MalformedRecordException {
-        return new JsonTreeRowRecordReader(in, logger, 
getFieldTypeOverrides());
+    public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws IOException, 
MalformedRecordException {
+        return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile), 
dateFormat, timeFormat, timestampFormat);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 4a2d212..c8d07f4 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -19,35 +19,40 @@ package org.apache.nifi.json;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
 
 
 public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
-    private final Map<String, DataType> fieldTypeOverrides;
-    private RecordSchema schema;
+    private final RecordSchema schema;
+    private final String dateFormat;
+    private final String timeFormat;
+    private final String timestampFormat;
 
-    public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final Map<String, DataType> fieldTypeOverrides) throws IOException, 
MalformedRecordException {
+    public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
+        final String dateFormat, final String timeFormat, final String 
timestampFormat) throws IOException, MalformedRecordException {
         super(in, logger);
-        this.fieldTypeOverrides = fieldTypeOverrides;
+        this.schema = schema;
+
+        this.dateFormat = dateFormat;
+        this.timeFormat = timeFormat;
+        this.timestampFormat = timestampFormat;
     }
 
+
     @Override
     protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema) throws IOException, MalformedRecordException {
         if (jsonNode == null) {
@@ -68,48 +73,76 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
         return new MapRecord(schema, values);
     }
 
-
-    @Override
-    public RecordSchema getSchema() {
-        if (schema != null) {
-            return schema;
+    protected Object convertField(final JsonNode fieldNode, final String 
fieldName, final DataType desiredType) throws IOException, 
MalformedRecordException {
+        if (fieldNode == null || fieldNode.isNull()) {
+            return null;
         }
 
-        final List<RecordField> recordFields = new ArrayList<>();
-        final Optional<JsonNode> firstNodeOption = getFirstJsonNode();
-
-        if (firstNodeOption.isPresent()) {
-            final Iterator<Map.Entry<String, JsonNode>> itr = 
firstNodeOption.get().getFields();
-            while (itr.hasNext()) {
-                final Map.Entry<String, JsonNode> entry = itr.next();
-                final String elementName = entry.getKey();
-                final JsonNode node = entry.getValue();
+        switch (desiredType.getFieldType()) {
+            case BOOLEAN:
+                return DataTypeUtils.toBoolean(getRawNodeValue(fieldNode));
+            case BYTE:
+                return DataTypeUtils.toByte(getRawNodeValue(fieldNode));
+            case CHAR:
+                return DataTypeUtils.toCharacter(getRawNodeValue(fieldNode));
+            case DOUBLE:
+                return DataTypeUtils.toDouble(getRawNodeValue(fieldNode));
+            case FLOAT:
+                return DataTypeUtils.toFloat(getRawNodeValue(fieldNode));
+            case INT:
+                return DataTypeUtils.toInteger(getRawNodeValue(fieldNode));
+            case LONG:
+                return DataTypeUtils.toLong(getRawNodeValue(fieldNode));
+            case SHORT:
+                return DataTypeUtils.toShort(getRawNodeValue(fieldNode));
+            case STRING:
+                return DataTypeUtils.toString(getRawNodeValue(fieldNode), 
dateFormat, timeFormat, timestampFormat);
+            case DATE:
+                return DataTypeUtils.toDate(getRawNodeValue(fieldNode), 
dateFormat);
+            case TIME:
+                return DataTypeUtils.toTime(getRawNodeValue(fieldNode), 
timeFormat);
+            case TIMESTAMP:
+                return DataTypeUtils.toTimestamp(getRawNodeValue(fieldNode), 
timestampFormat);
+            case ARRAY: {
+                final ArrayNode arrayNode = (ArrayNode) fieldNode;
+                final int numElements = arrayNode.size();
+                final Object[] arrayElements = new Object[numElements];
+                int count = 0;
+                for (final JsonNode node : arrayNode) {
+                    final DataType elementType;
+                    if (desiredType instanceof ArrayDataType) {
+                        elementType = ((ArrayDataType) 
desiredType).getElementType();
+                    } else {
+                        elementType = determineFieldType(node);
+                    }
+
+                    final Object converted = convertField(node, fieldName, 
elementType);
+                    arrayElements[count++] = converted;
+                }
 
-                DataType dataType;
-                final DataType overriddenDataType = 
fieldTypeOverrides.get(elementName);
-                if (overriddenDataType == null) {
-                    dataType = determineFieldType(node);
+                return arrayElements;
+            }
+            case RECORD: {
+                if (fieldNode.isObject()) {
+                    final RecordSchema childSchema;
+                    if (desiredType instanceof RecordDataType) {
+                        childSchema = ((RecordDataType) 
desiredType).getChildSchema();
+                    } else {
+                        return null;
+                    }
+
+                    return convertJsonNodeToRecord(fieldNode, childSchema);
                 } else {
-                    dataType = overriddenDataType;
+                    return null;
                 }
-
-                recordFields.add(new RecordField(elementName, dataType));
             }
         }
 
-        // If there are any overridden field types that we didn't find, add as 
the last fields.
-        final Set<String> knownFieldNames = recordFields.stream()
-            .map(f -> f.getFieldName())
-            .collect(Collectors.toSet());
-
-        for (final Map.Entry<String, DataType> entry : 
fieldTypeOverrides.entrySet()) {
-            if (!knownFieldNames.contains(entry.getKey())) {
-                recordFields.add(new RecordField(entry.getKey(), 
entry.getValue()));
-            }
-        }
+        return null;
+    }
 
-        schema = new SimpleRecordSchema(recordFields);
+    @Override
+    public RecordSchema getSchema() {
         return schema;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java
deleted file mode 100644
index 3b7dcf9..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.json;
-
-import java.util.Optional;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public class PropertyNameUtil {
-
-    public static String getFieldName(final String propertyName) {
-        final int colonIndex = propertyName.indexOf(":");
-        if (colonIndex > -1 && colonIndex < propertyName.length() - 1) {
-            return propertyName.substring(0, colonIndex);
-        }
-
-        return propertyName;
-    }
-
-    public static boolean hasFieldType(final String propertyName) {
-        final int colonIndex = propertyName.indexOf(":");
-        return (colonIndex > -1 && colonIndex < propertyName.length() - 1);
-    }
-
-    public static Optional<String> getFieldTypeName(final String propertyName) 
{
-        if (hasFieldType(propertyName)) {
-            final String[] splits = propertyName.split("\\:");
-            if (splits.length > 1) {
-                return Optional.of(splits[1]);
-            }
-            return Optional.empty();
-        }
-
-        return Optional.empty();
-    }
-
-    public static Optional<String> getFieldFormat(final String propertyName) {
-        final String[] splits = propertyName.split("\\:");
-        if (splits.length != 3) {
-            return Optional.empty();
-        }
-
-        return Optional.of(splits[2]);
-    }
-
-    public static boolean isFieldTypeValid(final String propertyName) {
-        final Optional<String> fieldType = getFieldTypeName(propertyName);
-        if (!fieldType.isPresent()) {
-            return false;
-        }
-
-        final String typeName = fieldType.get();
-        final RecordFieldType recordFieldType = RecordFieldType.of(typeName);
-        return recordFieldType != null;
-    }
-
-    public static Optional<DataType> getDataType(final String propertyName) {
-        if (isFieldTypeValid(propertyName)) {
-            final String typeName = getFieldTypeName(propertyName).get();
-            final RecordFieldType fieldType = RecordFieldType.of(typeName);
-
-            final Optional<String> format = getFieldFormat(propertyName);
-            if (format.isPresent()) {
-                return Optional.of(fieldType.getDataType(format.get()));
-            } else {
-                return Optional.of(fieldType.getDataType());
-            }
-        }
-
-        return Optional.empty();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index cf72b19..05895d8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -20,17 +20,10 @@ package org.apache.nifi.json;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigInteger;
-import java.sql.Array;
 import java.sql.SQLException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.DataTypeUtils;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
@@ -38,25 +31,29 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.stream.io.NonCloseableOutputStream;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonGenerator;
 
 public class WriteJsonResult implements RecordSetWriter {
-    private final boolean prettyPrint;
-
     private final ComponentLog logger;
+    private final boolean prettyPrint;
     private final JsonFactory factory = new JsonFactory();
-    private final DateFormat dateFormat;
-    private final DateFormat timeFormat;
-    private final DateFormat timestampFormat;
+    private final String dateFormat;
+    private final String timeFormat;
+    private final String timestampFormat;
 
     public WriteJsonResult(final ComponentLog logger, final boolean 
prettyPrint, final String dateFormat, final String timeFormat, final String 
timestampFormat) {
         this.prettyPrint = prettyPrint;
-        this.dateFormat = new SimpleDateFormat(dateFormat);
-        this.timeFormat = new SimpleDateFormat(timeFormat);
-        this.timestampFormat = new SimpleDateFormat(timestampFormat);
+
+        this.dateFormat = dateFormat;
+        this.timeFormat = timeFormat;
+        this.timestampFormat = timestampFormat;
+
         this.logger = logger;
     }
 
@@ -127,26 +124,6 @@ public class WriteJsonResult implements RecordSetWriter {
         }
     }
 
-    private String createDate(final Object value, final DateFormat format) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Date) {
-            return format.format((Date) value);
-        }
-        if (value instanceof java.sql.Date) {
-            return format.format(new Date(((java.sql.Date) value).getTime()));
-        }
-        if (value instanceof java.sql.Time) {
-            return format.format(new Date(((java.sql.Time) value).getTime()));
-        }
-        if (value instanceof java.sql.Timestamp) {
-            return format.format(new Date(((java.sql.Timestamp) 
value).getTime()));
-        }
-
-        return null;
-    }
 
     private void writeValue(final JsonGenerator generator, final Object value, 
final DataType dataType, final boolean moreCols)
         throws JsonGenerationException, IOException, SQLException {
@@ -155,50 +132,46 @@ public class WriteJsonResult implements RecordSetWriter {
             return;
         }
 
-        final DataType resolvedDataType;
-        if (dataType.getFieldType() == RecordFieldType.CHOICE) {
-            resolvedDataType = DataTypeUtils.inferDataType(value);
-        } else {
-            resolvedDataType = dataType;
+        final DataType chosenDataType = dataType.getFieldType() == 
RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) 
dataType) : dataType;
+        final Object coercedValue = DataTypeUtils.convertType(value, 
chosenDataType);
+        if (coercedValue == null) {
+            generator.writeNull();
+            return;
         }
 
-        switch (resolvedDataType.getFieldType()) {
+        switch (chosenDataType.getFieldType()) {
             case DATE:
-                generator.writeString(createDate(value, dateFormat));
-                break;
             case TIME:
-                generator.writeString(createDate(value, timeFormat));
-                break;
             case TIMESTAMP:
-                generator.writeString(createDate(value, timestampFormat));
+                generator.writeString(DataTypeUtils.toString(coercedValue, 
dateFormat, timeFormat, timestampFormat));
                 break;
             case DOUBLE:
-                generator.writeNumber(DataTypeUtils.toDouble(value, 0D));
+                generator.writeNumber(DataTypeUtils.toDouble(coercedValue));
                 break;
             case FLOAT:
-                generator.writeNumber(DataTypeUtils.toFloat(value, 0F));
+                generator.writeNumber(DataTypeUtils.toFloat(coercedValue));
                 break;
             case LONG:
-                generator.writeNumber(DataTypeUtils.toLong(value, 0L));
+                generator.writeNumber(DataTypeUtils.toLong(coercedValue));
                 break;
             case INT:
             case BYTE:
             case SHORT:
-                generator.writeNumber(DataTypeUtils.toInteger(value, 0));
+                generator.writeNumber(DataTypeUtils.toInteger(coercedValue));
                 break;
             case CHAR:
             case STRING:
-                generator.writeString(value.toString());
+                generator.writeString(coercedValue.toString());
                 break;
             case BIGINT:
-                if (value instanceof Long) {
-                    generator.writeNumber(((Long) value).longValue());
+                if (coercedValue instanceof Long) {
+                    generator.writeNumber(((Long) coercedValue).longValue());
                 } else {
-                    generator.writeNumber((BigInteger) value);
+                    generator.writeNumber((BigInteger) coercedValue);
                 }
                 break;
             case BOOLEAN:
-                final String stringValue = value.toString();
+                final String stringValue = coercedValue.toString();
                 if ("true".equalsIgnoreCase(stringValue)) {
                     generator.writeBoolean(true);
                 } else if ("false".equalsIgnoreCase(stringValue)) {
@@ -208,95 +181,34 @@ public class WriteJsonResult implements RecordSetWriter {
                 }
                 break;
             case RECORD: {
-                final Record record = (Record) value;
+                final Record record = (Record) coercedValue;
                 writeRecord(record, generator, gen -> gen.writeStartObject(), 
gen -> gen.writeEndObject());
                 break;
             }
             case ARRAY:
             default:
-                if ("null".equals(value.toString())) {
-                    generator.writeNull();
-                } else if (value instanceof Map) {
-                    final Map<?, ?> map = (Map<?, ?>) value;
-                    generator.writeStartObject();
-
-                    int i = 0;
-                    for (final Map.Entry<?, ?> entry : map.entrySet()) {
-                        generator.writeFieldName(entry.getKey().toString());
-                        final boolean moreEntries = ++i < map.size();
-                        writeValue(generator, entry.getValue(), 
getColType(entry.getValue()), moreEntries);
-                    }
-                    generator.writeEndObject();
-                } else if (value instanceof List) {
-                    final List<?> list = (List<?>) value;
-                    writeArray(list.toArray(), generator);
-                } else if (value instanceof Array) {
-                    final Array array = (Array) value;
-                    final Object[] values = (Object[]) array.getArray();
-                    writeArray(values, generator);
-                } else if (value instanceof Object[]) {
-                    final Object[] values = (Object[]) value;
-                    writeArray(values, generator);
+                if (coercedValue instanceof Object[]) {
+                    final Object[] values = (Object[]) coercedValue;
+                    final ArrayDataType arrayDataType = (ArrayDataType) 
dataType;
+                    final DataType elementType = 
arrayDataType.getElementType();
+                    writeArray(values, generator, elementType);
                 } else {
-                    generator.writeString(value.toString());
+                    generator.writeString(coercedValue.toString());
                 }
                 break;
         }
     }
 
-    private void writeArray(final Object[] values, final JsonGenerator 
generator) throws JsonGenerationException, IOException, SQLException {
+    private void writeArray(final Object[] values, final JsonGenerator 
generator, final DataType elementType) throws JsonGenerationException, 
IOException, SQLException {
         generator.writeStartArray();
         for (int i = 0; i < values.length; i++) {
             final boolean moreEntries = i < values.length - 1;
             final Object element = values[i];
-            writeValue(generator, element, getColType(element), moreEntries);
+            writeValue(generator, element, elementType, moreEntries);
         }
         generator.writeEndArray();
     }
 
-    private DataType getColType(final Object value) {
-        if (value instanceof String) {
-            return RecordFieldType.STRING.getDataType();
-        }
-        if (value instanceof Double) {
-            return RecordFieldType.DOUBLE.getDataType();
-        }
-        if (value instanceof Float) {
-            return RecordFieldType.FLOAT.getDataType();
-        }
-        if (value instanceof Integer) {
-            return RecordFieldType.INT.getDataType();
-        }
-        if (value instanceof Long) {
-            return RecordFieldType.LONG.getDataType();
-        }
-        if (value instanceof BigInteger) {
-            return RecordFieldType.BIGINT.getDataType();
-        }
-        if (value instanceof Boolean) {
-            return RecordFieldType.BOOLEAN.getDataType();
-        }
-        if (value instanceof Byte || value instanceof Short) {
-            return RecordFieldType.INT.getDataType();
-        }
-        if (value instanceof Character) {
-            return RecordFieldType.STRING.getDataType();
-        }
-        if (value instanceof java.util.Date || value instanceof java.sql.Date) 
{
-            return RecordFieldType.DATE.getDataType();
-        }
-        if (value instanceof java.sql.Time) {
-            return RecordFieldType.TIME.getDataType();
-        }
-        if (value instanceof java.sql.Timestamp) {
-            return RecordFieldType.TIMESTAMP.getDataType();
-        }
-        if (value instanceof Object[] || value instanceof List || value 
instanceof Array) {
-            return RecordFieldType.ARRAY.getDataType();
-        }
-
-        return RecordFieldType.RECORD.getDataType();
-    }
 
     @Override
     public String getMimeType() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
deleted file mode 100644
index b58a22e..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public abstract class AbstractRecordSetWriter extends 
AbstractControllerService {
-    static final PropertyDescriptor DATE_FORMAT = new 
PropertyDescriptor.Builder()
-        .name("Date Format")
-        .description("Specifies the format to use when writing out Date 
fields")
-        .expressionLanguageSupported(false)
-        .defaultValue(RecordFieldType.DATE.getDefaultFormat())
-        .addValidator(new SimpleDateFormatValidator())
-        .required(true)
-        .build();
-
-    static final PropertyDescriptor TIME_FORMAT = new 
PropertyDescriptor.Builder()
-        .name("Time Format")
-        .description("Specifies the format to use when writing out Time 
fields")
-        .expressionLanguageSupported(false)
-        .defaultValue(RecordFieldType.TIME.getDefaultFormat())
-        .addValidator(new SimpleDateFormatValidator())
-        .required(true)
-        .build();
-
-    static final PropertyDescriptor TIMESTAMP_FORMAT = new 
PropertyDescriptor.Builder()
-        .name("Timestamp Format")
-        .description("Specifies the format to use when writing out Timestamp 
(date/time) fields")
-        .expressionLanguageSupported(false)
-        .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat())
-        .addValidator(new SimpleDateFormatValidator())
-        .required(true)
-        .build();
-
-    private volatile String dateFormat;
-    private volatile String timeFormat;
-    private volatile String timestampFormat;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return Arrays.asList(DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT);
-    }
-
-    @OnEnabled
-    public void captureValues(final ConfigurationContext context) {
-        this.dateFormat = context.getProperty(DATE_FORMAT).getValue();
-        this.timeFormat = context.getProperty(TIME_FORMAT).getValue();
-        this.timestampFormat = 
context.getProperty(TIMESTAMP_FORMAT).getValue();
-    }
-
-    protected String getDateFormat() {
-        return dateFormat;
-    }
-
-    protected String getTimeFormat() {
-        return timeFormat;
-    }
-
-    protected String getTimestampFormat() {
-        return timestampFormat;
-    }
-}

Reply via email to