This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e18d9ce  NIFI-6369: This closes #3560. Updated JSON Readers to convert 
String values to Date/Time/Timestamp objects when appropriate according to the 
schema and the configured pattern NIFI-6939: Fix to WriteJsonRecord to deal 
with date/time/timestamp when no format is explicitly set
e18d9ce is described below

commit e18d9ce1e8d576a21630dcd67e9260ff5c3f9bf0
Author: Mark Payne <[email protected]>
AuthorDate: Mon Jul 1 09:50:14 2019 -0400

    NIFI-6369: This closes #3560. Updated JSON Readers to convert String values 
to Date/Time/Timestamp objects when appropriate according to the schema and the 
configured pattern
    NIFI-6939: Fix to WriteJsonRecord to deal with date/time/timestamp when no 
format is explicitly set
---
 .../serialization/record/util/DataTypeUtils.java   | 24 ++++++--
 .../nifi-standard-processors/pom.xml               |  2 +
 .../processors/standard/TestValidateRecord.java    | 64 ++++++++++++++++++++++
 .../resources/TestValidateRecord/timestamp.avsc    | 11 ++++
 .../resources/TestValidateRecord/timestamp.json    |  1 +
 .../nifi-record-serialization-services/pom.xml     |  1 +
 .../nifi/json/AbstractJsonRowRecordReader.java     | 55 ++++++++++++++++---
 .../apache/nifi/json/JsonPathRowRecordReader.java  | 37 ++++++-------
 .../apache/nifi/json/JsonTreeRowRecordReader.java  | 41 +++++++-------
 .../java/org/apache/nifi/json/WriteJsonResult.java | 37 +++++++++++++
 .../org/apache/nifi/csv/TestCSVRecordReader.java   | 20 ++++---
 .../nifi/json/TestJsonPathRowRecordReader.java     | 51 +++++++++++------
 .../nifi/json/TestJsonTreeRowRecordReader.java     | 16 ++++++
 .../src/test/resources/json/timestamp.json         |  1 +
 14 files changed, 285 insertions(+), 76 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 308cafa..b1daa91 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1116,12 +1116,12 @@ public class DataTypeUtils {
         }
 
         if (value instanceof String) {
-            try {
-                final String string = ((String) value).trim();
-                if (string.isEmpty()) {
-                    return null;
-                }
+            final String string = ((String) value).trim();
+            if (string.isEmpty()) {
+                return null;
+            }
 
+            try {
                 if (format == null) {
                     return new Timestamp(Long.parseLong(string));
                 }
@@ -1130,11 +1130,23 @@ public class DataTypeUtils {
                 if (dateFormat == null) {
                     return new Timestamp(Long.parseLong(string));
                 }
+
                 final java.util.Date utilDate = dateFormat.parse(string);
                 return new Timestamp(utilDate.getTime());
             } catch (final ParseException e) {
+                final DateFormat dateFormat = format.get();
+                final String formatDescription;
+                if (dateFormat == null) {
+                    formatDescription = "Numeric";
+                } else if (dateFormat instanceof SimpleDateFormat) {
+                    formatDescription = ((SimpleDateFormat) 
dateFormat).toPattern();
+                } else {
+                    formatDescription = dateFormat.toString();
+                }
+
                 throw new IllegalTypeConversionException("Could not convert 
value [" + value
-                    + "] of type java.lang.String to Timestamp for field " + 
fieldName + " because the value is not in the expected date format: " + format);
+                    + "] of type java.lang.String to Timestamp for field " + 
fieldName + " because the value is not in the expected date format: "
+                    + formatDescription);
             }
         }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 5a3cee4..340aa46 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -572,6 +572,8 @@
                         
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
                         
<exclude>src/test/resources/TestValidateRecord/nested-map-input.json</exclude>
                         
<exclude>src/test/resources/TestValidateRecord/nested-map-schema.avsc</exclude>
+                        
<exclude>src/test/resources/TestValidateRecord/timestamp.avsc</exclude>
+                        
<exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
index 932f6d1..bbcc9a9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
@@ -27,6 +27,8 @@ import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -41,6 +43,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
@@ -409,4 +412,65 @@ public class TestValidateRecord {
         runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
     }
 
+
+    @Test
+    public void testValidateJsonTimestamp() throws IOException, 
InitializationException {
+        final String validateSchema = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/timestamp.avsc")),
 StandardCharsets.UTF_8);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, 
validateSchema);
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, 
"yyyy/MM/dd HH:mm:ss");
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", validWriter);
+        runner.setProperty(validWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.setProperty(validWriter, DateTimeUtils.TIMESTAMP_FORMAT, 
"yyyy/MM/dd HH:mm:ss");
+        runner.enableControllerService(validWriter);
+
+        runner.setProperty(ValidateRecord.RECORD_READER, "reader");
+        runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
+        runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "writer");
+        runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
+
+        runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
+        
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+        final MockFlowFile validFlowFile = 
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
+        validFlowFile.assertContentEquals(new 
File("src/test/resources/TestValidateRecord/timestamp.json"));
+
+        // Test with a timestamp that has an invalid format.
+        runner.clearTransferState();
+
+        runner.disableControllerService(jsonReader);
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, 
"yyyy-MM-dd HH:mm:ss");
+        
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+        runner.enableControllerService(jsonReader);
+
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
+        final MockFlowFile invalidFlowFile = 
runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
+        invalidFlowFile.assertContentEquals(new 
File("src/test/resources/TestValidateRecord/timestamp.json"));
+
+        // Test with an Inferred Schema.
+        runner.disableControllerService(jsonReader);
+        runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA.getValue());
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, 
"yyyy/MM/dd HH:mm:ss");
+        runner.enableControllerService(jsonReader);
+
+        runner.clearTransferState();
+        
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+        final MockFlowFile validFlowFileInferredSchema = 
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
+        validFlowFileInferredSchema.assertContentEquals(new 
File("src/test/resources/TestValidateRecord/timestamp.json"));
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc
new file mode 100644
index 0000000..3c480ac
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc
@@ -0,0 +1,11 @@
+{
+  "name": "ts",
+  "namespace": "nifi",
+  "type": "record",
+  "fields": [{
+      "name": "timestamp",
+      "type": {
+          "type": "long", "logicalType": "timestamp-millis"
+       }
+   }]
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json
new file mode 100644
index 0000000..b6a68d3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json
@@ -0,0 +1 @@
+[{"timestamp":"2019/06/27 12:45:28"}]
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index e32ec28..6b984a3 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -154,6 +154,7 @@
                         
<exclude>src/test/resources/json/bank-account-multiline.json</exclude>
                         
<exclude>src/test/resources/json/bank-account-oneline.json</exclude>
                         
<exclude>src/test/resources/json/data-types.json</exclude>
+                        
<exclude>src/test/resources/json/timestamp.json</exclude>
                         
<exclude>src/test/resources/json/json-with-unicode.json</exclude>
                         
<exclude>src/test/resources/json/primitive-type-array.json</exclude>
                         
<exclude>src/test/resources/json/single-bank-account.json</exclude>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 9874a02..69b7fab 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -40,15 +40,20 @@ import org.codehaus.jackson.node.ArrayNode;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.text.DateFormat;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.function.Supplier;
 
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
     private final ComponentLog logger;
     private final JsonParser jsonParser;
     private final JsonNode firstJsonNode;
+    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
+    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
+    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
 
     private boolean firstObjectConsumed = false;
 
@@ -61,6 +66,14 @@ public abstract class AbstractJsonRowRecordReader implements 
RecordReader {
 
         this.logger = logger;
 
+        final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
+        final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
+        final DateFormat tsf = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
+
+        LAZY_DATE_FORMAT = () -> df;
+        LAZY_TIME_FORMAT = () -> tf;
+        LAZY_TIMESTAMP_FORMAT = () -> tsf;
+
         try {
             jsonParser = jsonFactory.createJsonParser(in);
             jsonParser.setCodec(codec);
@@ -80,6 +93,18 @@ public abstract class AbstractJsonRowRecordReader implements 
RecordReader {
         }
     }
 
+    protected Supplier<DateFormat> getLazyDateFormat() {
+        return LAZY_DATE_FORMAT;
+    }
+
+    protected Supplier<DateFormat> getLazyTimeFormat() {
+        return LAZY_TIME_FORMAT;
+    }
+
+    protected Supplier<DateFormat> getLazyTimestampFormat() {
+        return LAZY_TIMESTAMP_FORMAT;
+    }
+
 
     @Override
     public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
@@ -99,11 +124,11 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         }
     }
 
-    protected Object getRawNodeValue(final JsonNode fieldNode) throws 
IOException {
-        return getRawNodeValue(fieldNode, null);
+    protected Object getRawNodeValue(final JsonNode fieldNode, final String 
fieldName) throws IOException {
+        return getRawNodeValue(fieldNode, null, fieldName);
     }
 
-    protected Object getRawNodeValue(final JsonNode fieldNode, final DataType 
dataType) throws IOException {
+    protected Object getRawNodeValue(final JsonNode fieldNode, final DataType 
dataType, final String fieldName) throws IOException {
         if (fieldNode == null || fieldNode.isNull()) {
             return null;
         }
@@ -121,7 +146,23 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         }
 
         if (fieldNode.isTextual()) {
-            return fieldNode.getTextValue();
+            final String textValue = fieldNode.getTextValue();
+            if (dataType == null) {
+                return textValue;
+            }
+
+            switch (dataType.getFieldType()) {
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    try {
+                        return DataTypeUtils.convertType(textValue, dataType, 
LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+                    } catch (final Exception e) {
+                        return textValue;
+                    }
+            }
+
+            return textValue;
         }
 
         if (fieldNode.isArray()) {
@@ -139,7 +180,7 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
             }
 
             for (final JsonNode node : arrayNode) {
-                final Object value = getRawNodeValue(node, elementDataType);
+                final Object value = getRawNodeValue(node, elementDataType, 
fieldName);
                 arrayElements[count++] = value;
             }
 
@@ -166,7 +207,7 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
                     while (fieldNames.hasNext()) {
                         final String childFieldName = fieldNames.next();
 
-                        final Object childValue = 
getRawNodeValue(fieldNode.get(childFieldName), 
possibleSchema.getDataType(childFieldName).orElse(null));
+                        final Object childValue = 
getRawNodeValue(fieldNode.get(childFieldName), 
possibleSchema.getDataType(childFieldName).orElse(null), childFieldName);
                         childValues.put(childFieldName, childValue);
                     }
 
@@ -187,7 +228,7 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
                 final String childFieldName = fieldNames.next();
 
                 final DataType childDataType = 
childSchema.getDataType(childFieldName).orElse(null);
-                final Object childValue = 
getRawNodeValue(fieldNode.get(childFieldName), childDataType);
+                final Object childValue = 
getRawNodeValue(fieldNode.get(childFieldName), childDataType, childFieldName);
                 childValues.put(childFieldName, childValue);
             }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index a664b88..c977cfd 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -39,14 +39,12 @@ import org.codehaus.jackson.JsonNode;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.text.DateFormat;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Supplier;
 
 public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
     private static final Configuration STRICT_PROVIDER_CONFIGURATION = 
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
@@ -56,23 +54,11 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
     private final InputStream in;
     private RecordSchema schema;
 
-    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
-    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
-    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
-
     public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> 
jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog 
logger,
                 final String dateFormat, final String timeFormat, final String 
timestampFormat)
                 throws MalformedRecordException, IOException {
         super(in, logger, dateFormat, timeFormat, timestampFormat);
 
-        final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
-        final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
-        final DateFormat tsf = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
-
-        LAZY_DATE_FORMAT = () -> df;
-        LAZY_TIME_FORMAT = () -> tf;
-        LAZY_TIMESTAMP_FORMAT = () -> tsf;
-
         this.schema = schema;
         this.jsonPaths = jsonPaths;
         this.in = in;
@@ -123,7 +109,7 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
                 value = convert(value, desiredType, fieldName, defaultValue);
             } else {
                 final DataType dataType = 
field.map(RecordField::getDataType).orElse(null);
-                value = convert(value, dataType);
+                value = convert(value, dataType, fieldName);
             }
 
             values.put(fieldName, value);
@@ -134,7 +120,7 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
 
 
     @SuppressWarnings("unchecked")
-    protected Object convert(final Object value, final DataType dataType) {
+    protected Object convert(final Object value, final DataType dataType, 
final String fieldName) {
         if (value == null) {
             return null;
         }
@@ -152,7 +138,7 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
 
             int i = 0;
             for (final Object val : list) {
-                array[i++] = convert(val, elementDataType);
+                array[i++] = convert(val, elementDataType, fieldName);
             }
 
             return array;
@@ -188,12 +174,25 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
                 final RecordField recordField = 
childSchema.getField(key).orElse(null);
                 final DataType childDataType = recordField == null ? null : 
recordField.getDataType();
 
-                values.put(key, convert(childValue, childDataType));
+                values.put(key, convert(childValue, childDataType, fieldName));
             }
 
             return new MapRecord(childSchema, values);
         }
 
+        if (value instanceof String) {
+            switch (dataType.getFieldType()) {
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    try {
+                        return DataTypeUtils.convertType(value, dataType, 
getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
+                    } catch (final Exception e) {
+                        return value;
+                    }
+            }
+        }
+
         return value;
     }
 
@@ -240,7 +239,7 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
 
             return new MapRecord(childSchema, coercedValues);
         } else {
-            return DataTypeUtils.convertType(value, dataType, 
LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+            return DataTypeUtils.convertType(value, dataType, 
getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 058d9ee..d0172e9 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -36,7 +36,6 @@ import org.codehaus.jackson.node.ArrayNode;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -47,23 +46,11 @@ import java.util.function.Supplier;
 public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
     private final RecordSchema schema;
 
-    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
-    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
-    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
-
 
     public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
         final String dateFormat, final String timeFormat, final String 
timestampFormat) throws IOException, MalformedRecordException {
         super(in, logger, dateFormat, timeFormat, timestampFormat);
         this.schema = schema;
-
-        final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
-        final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
-        final DateFormat tsf = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
-
-        LAZY_DATE_FORMAT = () -> df;
-        LAZY_TIME_FORMAT = () -> tf;
-        LAZY_TIMESTAMP_FORMAT = () -> tsf;
     }
 
 
@@ -97,6 +84,22 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
         return null;
     }
 
+    private boolean isDateTimeTimestampType(final RecordField field) {
+        if (field == null) {
+            return false;
+        }
+
+        final RecordFieldType fieldType = field.getDataType().getFieldType();
+        switch (fieldType) {
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+                return true;
+            default:
+                return false;
+        }
+    }
+
     private Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema, final String fieldNamePrefix,
             final boolean coerceTypes, final boolean dropUnknown) throws 
IOException, MalformedRecordException {
 
@@ -111,13 +114,13 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
 
                 final String fieldName = recordField.getFieldName();
 
-                final Object value;
+                Object value;
                 if (coerceTypes) {
                     final DataType desiredType = recordField.getDataType();
                     final String fullFieldName = fieldNamePrefix == null ? 
fieldName : fieldNamePrefix + fieldName;
                     value = convertField(childNode, fullFieldName, 
desiredType, dropUnknown);
                 } else {
-                    value = getRawNodeValue(childNode, recordField == null ? 
null : recordField.getDataType());
+                    value = getRawNodeValue(childNode, recordField == null ? 
null : recordField.getDataType(), fieldName);
                 }
 
                 values.put(fieldName, value);
@@ -136,7 +139,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                     final String fullFieldName = fieldNamePrefix == null ? 
fieldName : fieldNamePrefix + fieldName;
                     value = convertField(childNode, fullFieldName, 
desiredType, dropUnknown);
                 } else {
-                    value = getRawNodeValue(childNode, recordField == null ? 
null : recordField.getDataType());
+                    value = getRawNodeValue(childNode, recordField == null ? 
null : recordField.getDataType(), fieldName);
                 }
 
                 values.put(fieldName, value);
@@ -166,8 +169,8 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
             case DATE:
             case TIME:
             case TIMESTAMP: {
-                final Object rawValue = getRawNodeValue(fieldNode);
-                final Object converted = DataTypeUtils.convertType(rawValue, 
desiredType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, 
fieldName);
+                final Object rawValue = getRawNodeValue(fieldNode, fieldName);
+                final Object converted = DataTypeUtils.convertType(rawValue, 
desiredType, getLazyDateFormat(), getLazyTimeFormat(), 
getLazyTimestampFormat(), fieldName);
                 return converted;
             }
             case MAP: {
@@ -222,7 +225,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                 }
             }
             case CHOICE: {
-                return DataTypeUtils.convertType(getRawNodeValue(fieldNode, 
desiredType), desiredType, fieldName);
+                return DataTypeUtils.convertType(getRawNodeValue(fieldNode, 
desiredType, fieldName), desiredType, fieldName);
             }
         }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 2b65eea..dd7eaec 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -37,6 +37,7 @@ import 
org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.util.MinimalPrettyPrinter;
 
 import java.io.IOException;
@@ -60,6 +61,8 @@ public class WriteJsonResult extends AbstractRecordSetWriter 
implements RecordSe
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
     private String mimeType = "application/json";
 
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
     public WriteJsonResult(final ComponentLog logger, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, 
final boolean prettyPrint,
             final NullSuppression nullSuppression, final OutputGrouping 
outputGrouping, final String dateFormat, final String timeFormat, final String 
timestampFormat) throws IOException {
         this(logger, recordSchema, schemaAccess, out, prettyPrint, 
nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, 
"application/json");
@@ -86,6 +89,8 @@ public class WriteJsonResult extends AbstractRecordSetWriter 
implements RecordSe
         LAZY_TIMESTAMP_FORMAT = () -> tsf;
 
         final JsonFactory factory = new JsonFactory();
+        factory.setCodec(objectMapper);
+
         this.generator = factory.createJsonGenerator(out);
         if (prettyPrint) {
             generator.useDefaultPrettyPrinter();
@@ -270,9 +275,41 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
             return;
         }
 
+        if (value instanceof java.sql.Time) {
+            final Object formatted = format((java.sql.Time) value, 
LAZY_TIME_FORMAT);
+            generator.writeObject(formatted);
+            return;
+        }
+        if (value instanceof java.sql.Date) {
+            final Object formatted = format((java.sql.Date) value, 
LAZY_DATE_FORMAT);
+            generator.writeObject(formatted);
+            return;
+        }
+        if (value instanceof java.util.Date) {
+            final Object formatted = format((java.util.Date) value, 
LAZY_TIMESTAMP_FORMAT);
+            generator.writeObject(formatted);
+            return;
+        }
+
         generator.writeObject(value);
     }
 
+    private Object format(final java.util.Date value, final 
Supplier<DateFormat> formatSupplier) {
+        if (value == null) {
+            return null;
+        }
+
+        if (formatSupplier == null) {
+            return value.getTime();
+        }
+        final DateFormat format = formatSupplier.get();
+        if (format == null) {
+            return value.getTime();
+        }
+
+        return format.format(value);
+    }
+
     @SuppressWarnings("unchecked")
     private void writeValue(final JsonGenerator generator, final Object value, 
final String fieldName, final DataType dataType) throws IOException {
         if (value == null) {
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index dd25a3b..230a0e3 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -102,18 +102,20 @@ public class TestCSVRecordReader {
         fields.add(new RecordField("date", 
RecordFieldType.DATE.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream bais = new 
ByteArrayInputStream(text.getBytes());
-             final CSVRecordReader reader = new CSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format, true, false,
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream bais = new 
ByteArrayInputStream(text.getBytes());
+                 final CSVRecordReader reader = new CSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format, true, false,
                      "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
 
-            final Record record = reader.nextRecord();
-            final java.sql.Date date = (Date) record.getValue("date");
-            final Calendar calendar = 
Calendar.getInstance(TimeZone.getTimeZone("gmt"));
-            calendar.setTimeInMillis(date.getTime());
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final java.sql.Date date = (Date) record.getValue("date");
+                final Calendar calendar = 
Calendar.getInstance(TimeZone.getTimeZone("gmt"));
+                calendar.setTimeInMillis(date.getTime());
 
-            assertEquals(1983, calendar.get(Calendar.YEAR));
-            assertEquals(10, calendar.get(Calendar.MONTH));
-            assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+                assertEquals(1983, calendar.get(Calendar.YEAR));
+                assertEquals(10, calendar.get(Calendar.MONTH));
+                assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+            }
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
index 5e0da91..d97017c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
@@ -17,21 +17,7 @@
 
 package org.apache.nifi.json;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.stream.Collectors;
-
+import com.jayway.jsonpath.JsonPath;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -45,7 +31,21 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.jayway.jsonpath.JsonPath;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestJsonPathRowRecordReader {
     private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
@@ -167,6 +167,25 @@ public class TestJsonPathRowRecordReader {
         }
     }
 
+    @Test
+    public void testTimestampCoercedFromString() throws IOException, 
MalformedRecordException {
+        final List<RecordField> recordFields = Collections.singletonList(new 
RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+        final LinkedHashMap<String, JsonPath> jsonPaths = new 
LinkedHashMap<>();
+        jsonPaths.put("timestamp", JsonPath.compile("$.timestamp"));
+
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/timestamp.json"));
+                 final JsonPathRowRecordReader reader = new 
JsonPathRowRecordReader(jsonPaths, schema, in, 
Mockito.mock(ComponentLog.class), dateFormat, timeFormat, "yyyy/MM/dd 
HH:mm:ss")) {
+
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final Object value = record.getValue("timestamp");
+                assertTrue("With coerceTypes set to " + coerceTypes + ", value 
is not a Timestamp", value instanceof java.sql.Timestamp);
+            }
+        }
+    }
+
 
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index adbdc02..759a5ff 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -407,6 +407,22 @@ public class TestJsonTreeRowRecordReader {
 
 
     @Test
+    public void testTimestampCoercedFromString() throws IOException, 
MalformedRecordException {
+        final List<RecordField> recordFields = Collections.singletonList(new 
RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/timestamp.json"));
+                 final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final Object value = record.getValue("timestamp");
+                assertTrue("With coerceTypes set to " + coerceTypes + ", value 
is not a Timestamp", value instanceof java.sql.Timestamp);
+            }
+        }
+    }
+
+    @Test
     public void testSingleJsonElement() throws IOException, 
MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json
new file mode 100644
index 0000000..ee5d90f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json
@@ -0,0 +1 @@
+{"timestamp": "2019/06/27 13:04:04"}
\ No newline at end of file

Reply via email to