This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e18d9ce NIFI-6369: This closes #3560. Updated JSON Readers to convert
String values to Date/Time/Timestamp objects when appropriate according to the
schema and the configured pattern NIFI-6939: Fix to WriteJsonRecord to deal
with date/time/timestamp when no format is explicitly set
e18d9ce is described below
commit e18d9ce1e8d576a21630dcd67e9260ff5c3f9bf0
Author: Mark Payne <[email protected]>
AuthorDate: Mon Jul 1 09:50:14 2019 -0400
NIFI-6369: This closes #3560. Updated JSON Readers to convert String values
to Date/Time/Timestamp objects when appropriate according to the schema and the
configured pattern
NIFI-6939: Fix to WriteJsonRecord to deal with date/time/timestamp when no
format is explicitly set
---
.../serialization/record/util/DataTypeUtils.java | 24 ++++++--
.../nifi-standard-processors/pom.xml | 2 +
.../processors/standard/TestValidateRecord.java | 64 ++++++++++++++++++++++
.../resources/TestValidateRecord/timestamp.avsc | 11 ++++
.../resources/TestValidateRecord/timestamp.json | 1 +
.../nifi-record-serialization-services/pom.xml | 1 +
.../nifi/json/AbstractJsonRowRecordReader.java | 55 ++++++++++++++++---
.../apache/nifi/json/JsonPathRowRecordReader.java | 37 ++++++-------
.../apache/nifi/json/JsonTreeRowRecordReader.java | 41 +++++++-------
.../java/org/apache/nifi/json/WriteJsonResult.java | 37 +++++++++++++
.../org/apache/nifi/csv/TestCSVRecordReader.java | 20 ++++---
.../nifi/json/TestJsonPathRowRecordReader.java | 51 +++++++++++------
.../nifi/json/TestJsonTreeRowRecordReader.java | 16 ++++++
.../src/test/resources/json/timestamp.json | 1 +
14 files changed, 285 insertions(+), 76 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 308cafa..b1daa91 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1116,12 +1116,12 @@ public class DataTypeUtils {
}
if (value instanceof String) {
- try {
- final String string = ((String) value).trim();
- if (string.isEmpty()) {
- return null;
- }
+ final String string = ((String) value).trim();
+ if (string.isEmpty()) {
+ return null;
+ }
+ try {
if (format == null) {
return new Timestamp(Long.parseLong(string));
}
@@ -1130,11 +1130,23 @@ public class DataTypeUtils {
if (dateFormat == null) {
return new Timestamp(Long.parseLong(string));
}
+
final java.util.Date utilDate = dateFormat.parse(string);
return new Timestamp(utilDate.getTime());
} catch (final ParseException e) {
+ final DateFormat dateFormat = format.get();
+ final String formatDescription;
+ if (dateFormat == null) {
+ formatDescription = "Numeric";
+ } else if (dateFormat instanceof SimpleDateFormat) {
+ formatDescription = ((SimpleDateFormat)
dateFormat).toPattern();
+ } else {
+ formatDescription = dateFormat.toString();
+ }
+
throw new IllegalTypeConversionException("Could not convert
value [" + value
- + "] of type java.lang.String to Timestamp for field " +
fieldName + " because the value is not in the expected date format: " + format);
+ + "] of type java.lang.String to Timestamp for field " +
fieldName + " because the value is not in the expected date format: "
+ + formatDescription);
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 5a3cee4..340aa46 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -572,6 +572,8 @@
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
<exclude>src/test/resources/TestValidateRecord/nested-map-input.json</exclude>
<exclude>src/test/resources/TestValidateRecord/nested-map-schema.avsc</exclude>
+
<exclude>src/test/resources/TestValidateRecord/timestamp.avsc</exclude>
+
<exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude>
</excludes>
</configuration>
</plugin>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
index 932f6d1..bbcc9a9 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
@@ -27,6 +27,8 @@ import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -41,6 +43,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
@@ -409,4 +412,65 @@ public class TestValidateRecord {
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
}
+
+ @Test
+ public void testValidateJsonTimestamp() throws IOException,
InitializationException {
+ final String validateSchema = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/timestamp.avsc")),
StandardCharsets.UTF_8);
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+ runner.setProperty(jsonReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT,
validateSchema);
+ runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT,
"yyyy/MM/dd HH:mm:ss");
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
+ runner.addControllerService("writer", validWriter);
+ runner.setProperty(validWriter, "Schema Write Strategy",
"full-schema-attribute");
+ runner.setProperty(validWriter, DateTimeUtils.TIMESTAMP_FORMAT,
"yyyy/MM/dd HH:mm:ss");
+ runner.enableControllerService(validWriter);
+
+ runner.setProperty(ValidateRecord.RECORD_READER, "reader");
+ runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
+ runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
+ runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "writer");
+ runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
+
+ runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
+
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+ runner.run();
+
+ runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+ final MockFlowFile validFlowFile =
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
+ validFlowFile.assertContentEquals(new
File("src/test/resources/TestValidateRecord/timestamp.json"));
+
+ // Test with a timestamp that has an invalid format.
+ runner.clearTransferState();
+
+ runner.disableControllerService(jsonReader);
+ runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT,
"yyyy-MM-dd HH:mm:ss");
+
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+ runner.enableControllerService(jsonReader);
+
+ runner.run();
+
+ runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
+ final MockFlowFile invalidFlowFile =
runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
+ invalidFlowFile.assertContentEquals(new
File("src/test/resources/TestValidateRecord/timestamp.json"));
+
+ // Test with an Inferred Schema.
+ runner.disableControllerService(jsonReader);
+ runner.setProperty(jsonReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaInferenceUtil.INFER_SCHEMA.getValue());
+ runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT,
"yyyy/MM/dd HH:mm:ss");
+ runner.enableControllerService(jsonReader);
+
+ runner.clearTransferState();
+
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+ runner.run();
+
+ runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+ final MockFlowFile validFlowFileInferredSchema =
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
+ validFlowFileInferredSchema.assertContentEquals(new
File("src/test/resources/TestValidateRecord/timestamp.json"));
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc
new file mode 100644
index 0000000..3c480ac
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc
@@ -0,0 +1,11 @@
+{
+ "name": "ts",
+ "namespace": "nifi",
+ "type": "record",
+ "fields": [{
+ "name": "timestamp",
+ "type": {
+ "type": "long", "logicalType": "timestamp-millis"
+ }
+ }]
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json
new file mode 100644
index 0000000..b6a68d3
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json
@@ -0,0 +1 @@
+[{"timestamp":"2019/06/27 12:45:28"}]
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index e32ec28..6b984a3 100755
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -154,6 +154,7 @@
<exclude>src/test/resources/json/bank-account-multiline.json</exclude>
<exclude>src/test/resources/json/bank-account-oneline.json</exclude>
<exclude>src/test/resources/json/data-types.json</exclude>
+
<exclude>src/test/resources/json/timestamp.json</exclude>
<exclude>src/test/resources/json/json-with-unicode.json</exclude>
<exclude>src/test/resources/json/primitive-type-array.json</exclude>
<exclude>src/test/resources/json/single-bank-account.json</exclude>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 9874a02..69b7fab 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -40,15 +40,20 @@ import org.codehaus.jackson.node.ArrayNode;
import java.io.IOException;
import java.io.InputStream;
+import java.text.DateFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.function.Supplier;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
private final ComponentLog logger;
private final JsonParser jsonParser;
private final JsonNode firstJsonNode;
+ private final Supplier<DateFormat> LAZY_DATE_FORMAT;
+ private final Supplier<DateFormat> LAZY_TIME_FORMAT;
+ private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private boolean firstObjectConsumed = false;
@@ -61,6 +66,14 @@ public abstract class AbstractJsonRowRecordReader implements
RecordReader {
this.logger = logger;
+ final DateFormat df = dateFormat == null ? null :
DataTypeUtils.getDateFormat(dateFormat);
+ final DateFormat tf = timeFormat == null ? null :
DataTypeUtils.getDateFormat(timeFormat);
+ final DateFormat tsf = timestampFormat == null ? null :
DataTypeUtils.getDateFormat(timestampFormat);
+
+ LAZY_DATE_FORMAT = () -> df;
+ LAZY_TIME_FORMAT = () -> tf;
+ LAZY_TIMESTAMP_FORMAT = () -> tsf;
+
try {
jsonParser = jsonFactory.createJsonParser(in);
jsonParser.setCodec(codec);
@@ -80,6 +93,18 @@ public abstract class AbstractJsonRowRecordReader implements
RecordReader {
}
}
+ protected Supplier<DateFormat> getLazyDateFormat() {
+ return LAZY_DATE_FORMAT;
+ }
+
+ protected Supplier<DateFormat> getLazyTimeFormat() {
+ return LAZY_TIME_FORMAT;
+ }
+
+ protected Supplier<DateFormat> getLazyTimestampFormat() {
+ return LAZY_TIMESTAMP_FORMAT;
+ }
+
@Override
public Record nextRecord(final boolean coerceTypes, final boolean
dropUnknownFields) throws IOException, MalformedRecordException {
@@ -99,11 +124,11 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
}
}
- protected Object getRawNodeValue(final JsonNode fieldNode) throws
IOException {
- return getRawNodeValue(fieldNode, null);
+ protected Object getRawNodeValue(final JsonNode fieldNode, final String
fieldName) throws IOException {
+ return getRawNodeValue(fieldNode, null, fieldName);
}
- protected Object getRawNodeValue(final JsonNode fieldNode, final DataType
dataType) throws IOException {
+ protected Object getRawNodeValue(final JsonNode fieldNode, final DataType
dataType, final String fieldName) throws IOException {
if (fieldNode == null || fieldNode.isNull()) {
return null;
}
@@ -121,7 +146,23 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
}
if (fieldNode.isTextual()) {
- return fieldNode.getTextValue();
+ final String textValue = fieldNode.getTextValue();
+ if (dataType == null) {
+ return textValue;
+ }
+
+ switch (dataType.getFieldType()) {
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ try {
+ return DataTypeUtils.convertType(textValue, dataType,
LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+ } catch (final Exception e) {
+ return textValue;
+ }
+ }
+
+ return textValue;
}
if (fieldNode.isArray()) {
@@ -139,7 +180,7 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
}
for (final JsonNode node : arrayNode) {
- final Object value = getRawNodeValue(node, elementDataType);
+ final Object value = getRawNodeValue(node, elementDataType,
fieldName);
arrayElements[count++] = value;
}
@@ -166,7 +207,7 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
while (fieldNames.hasNext()) {
final String childFieldName = fieldNames.next();
- final Object childValue =
getRawNodeValue(fieldNode.get(childFieldName),
possibleSchema.getDataType(childFieldName).orElse(null));
+ final Object childValue =
getRawNodeValue(fieldNode.get(childFieldName),
possibleSchema.getDataType(childFieldName).orElse(null), childFieldName);
childValues.put(childFieldName, childValue);
}
@@ -187,7 +228,7 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
final String childFieldName = fieldNames.next();
final DataType childDataType =
childSchema.getDataType(childFieldName).orElse(null);
- final Object childValue =
getRawNodeValue(fieldNode.get(childFieldName), childDataType);
+ final Object childValue =
getRawNodeValue(fieldNode.get(childFieldName), childDataType, childFieldName);
childValues.put(childFieldName, childValue);
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index a664b88..c977cfd 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -39,14 +39,12 @@ import org.codehaus.jackson.JsonNode;
import java.io.IOException;
import java.io.InputStream;
-import java.text.DateFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.function.Supplier;
public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
private static final Configuration STRICT_PROVIDER_CONFIGURATION =
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
@@ -56,23 +54,11 @@ public class JsonPathRowRecordReader extends
AbstractJsonRowRecordReader {
private final InputStream in;
private RecordSchema schema;
- private final Supplier<DateFormat> LAZY_DATE_FORMAT;
- private final Supplier<DateFormat> LAZY_TIME_FORMAT;
- private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
-
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath>
jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog
logger,
final String dateFormat, final String timeFormat, final String
timestampFormat)
throws MalformedRecordException, IOException {
super(in, logger, dateFormat, timeFormat, timestampFormat);
- final DateFormat df = dateFormat == null ? null :
DataTypeUtils.getDateFormat(dateFormat);
- final DateFormat tf = timeFormat == null ? null :
DataTypeUtils.getDateFormat(timeFormat);
- final DateFormat tsf = timestampFormat == null ? null :
DataTypeUtils.getDateFormat(timestampFormat);
-
- LAZY_DATE_FORMAT = () -> df;
- LAZY_TIME_FORMAT = () -> tf;
- LAZY_TIMESTAMP_FORMAT = () -> tsf;
-
this.schema = schema;
this.jsonPaths = jsonPaths;
this.in = in;
@@ -123,7 +109,7 @@ public class JsonPathRowRecordReader extends
AbstractJsonRowRecordReader {
value = convert(value, desiredType, fieldName, defaultValue);
} else {
final DataType dataType =
field.map(RecordField::getDataType).orElse(null);
- value = convert(value, dataType);
+ value = convert(value, dataType, fieldName);
}
values.put(fieldName, value);
@@ -134,7 +120,7 @@ public class JsonPathRowRecordReader extends
AbstractJsonRowRecordReader {
@SuppressWarnings("unchecked")
- protected Object convert(final Object value, final DataType dataType) {
+ protected Object convert(final Object value, final DataType dataType,
final String fieldName) {
if (value == null) {
return null;
}
@@ -152,7 +138,7 @@ public class JsonPathRowRecordReader extends
AbstractJsonRowRecordReader {
int i = 0;
for (final Object val : list) {
- array[i++] = convert(val, elementDataType);
+ array[i++] = convert(val, elementDataType, fieldName);
}
return array;
@@ -188,12 +174,25 @@ public class JsonPathRowRecordReader extends
AbstractJsonRowRecordReader {
final RecordField recordField =
childSchema.getField(key).orElse(null);
final DataType childDataType = recordField == null ? null :
recordField.getDataType();
- values.put(key, convert(childValue, childDataType));
+ values.put(key, convert(childValue, childDataType, fieldName));
}
return new MapRecord(childSchema, values);
}
+ if (value instanceof String) {
+ switch (dataType.getFieldType()) {
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ try {
+ return DataTypeUtils.convertType(value, dataType,
getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
+ } catch (final Exception e) {
+ return value;
+ }
+ }
+ }
+
return value;
}
@@ -240,7 +239,7 @@ public class JsonPathRowRecordReader extends
AbstractJsonRowRecordReader {
return new MapRecord(childSchema, coercedValues);
} else {
- return DataTypeUtils.convertType(value, dataType,
LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+ return DataTypeUtils.convertType(value, dataType,
getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 058d9ee..d0172e9 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -36,7 +36,6 @@ import org.codehaus.jackson.node.ArrayNode;
import java.io.IOException;
import java.io.InputStream;
-import java.text.DateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -47,23 +46,11 @@ import java.util.function.Supplier;
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
private final RecordSchema schema;
- private final Supplier<DateFormat> LAZY_DATE_FORMAT;
- private final Supplier<DateFormat> LAZY_TIME_FORMAT;
- private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
-
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String
timestampFormat) throws IOException, MalformedRecordException {
super(in, logger, dateFormat, timeFormat, timestampFormat);
this.schema = schema;
-
- final DateFormat df = dateFormat == null ? null :
DataTypeUtils.getDateFormat(dateFormat);
- final DateFormat tf = timeFormat == null ? null :
DataTypeUtils.getDateFormat(timeFormat);
- final DateFormat tsf = timestampFormat == null ? null :
DataTypeUtils.getDateFormat(timestampFormat);
-
- LAZY_DATE_FORMAT = () -> df;
- LAZY_TIME_FORMAT = () -> tf;
- LAZY_TIMESTAMP_FORMAT = () -> tsf;
}
@@ -97,6 +84,22 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
return null;
}
+ private boolean isDateTimeTimestampType(final RecordField field) {
+ if (field == null) {
+ return false;
+ }
+
+ final RecordFieldType fieldType = field.getDataType().getFieldType();
+ switch (fieldType) {
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ return true;
+ default:
+ return false;
+ }
+ }
+
private Record convertJsonNodeToRecord(final JsonNode jsonNode, final
RecordSchema schema, final String fieldNamePrefix,
final boolean coerceTypes, final boolean dropUnknown) throws
IOException, MalformedRecordException {
@@ -111,13 +114,13 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
final String fieldName = recordField.getFieldName();
- final Object value;
+ Object value;
if (coerceTypes) {
final DataType desiredType = recordField.getDataType();
final String fullFieldName = fieldNamePrefix == null ?
fieldName : fieldNamePrefix + fieldName;
value = convertField(childNode, fullFieldName,
desiredType, dropUnknown);
} else {
- value = getRawNodeValue(childNode, recordField == null ?
null : recordField.getDataType());
+ value = getRawNodeValue(childNode, recordField == null ?
null : recordField.getDataType(), fieldName);
}
values.put(fieldName, value);
@@ -136,7 +139,7 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
final String fullFieldName = fieldNamePrefix == null ?
fieldName : fieldNamePrefix + fieldName;
value = convertField(childNode, fullFieldName,
desiredType, dropUnknown);
} else {
- value = getRawNodeValue(childNode, recordField == null ?
null : recordField.getDataType());
+ value = getRawNodeValue(childNode, recordField == null ?
null : recordField.getDataType(), fieldName);
}
values.put(fieldName, value);
@@ -166,8 +169,8 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
case DATE:
case TIME:
case TIMESTAMP: {
- final Object rawValue = getRawNodeValue(fieldNode);
- final Object converted = DataTypeUtils.convertType(rawValue,
desiredType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT,
fieldName);
+ final Object rawValue = getRawNodeValue(fieldNode, fieldName);
+ final Object converted = DataTypeUtils.convertType(rawValue,
desiredType, getLazyDateFormat(), getLazyTimeFormat(),
getLazyTimestampFormat(), fieldName);
return converted;
}
case MAP: {
@@ -222,7 +225,7 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
}
}
case CHOICE: {
- return DataTypeUtils.convertType(getRawNodeValue(fieldNode,
desiredType), desiredType, fieldName);
+ return DataTypeUtils.convertType(getRawNodeValue(fieldNode,
desiredType, fieldName), desiredType, fieldName);
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 2b65eea..dd7eaec 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -37,6 +37,7 @@ import
org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import java.io.IOException;
@@ -60,6 +61,8 @@ public class WriteJsonResult extends AbstractRecordSetWriter
implements RecordSe
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private String mimeType = "application/json";
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
public WriteJsonResult(final ComponentLog logger, final RecordSchema
recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out,
final boolean prettyPrint,
final NullSuppression nullSuppression, final OutputGrouping
outputGrouping, final String dateFormat, final String timeFormat, final String
timestampFormat) throws IOException {
this(logger, recordSchema, schemaAccess, out, prettyPrint,
nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat,
"application/json");
@@ -86,6 +89,8 @@ public class WriteJsonResult extends AbstractRecordSetWriter
implements RecordSe
LAZY_TIMESTAMP_FORMAT = () -> tsf;
final JsonFactory factory = new JsonFactory();
+ factory.setCodec(objectMapper);
+
this.generator = factory.createJsonGenerator(out);
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
@@ -270,9 +275,41 @@ public class WriteJsonResult extends
AbstractRecordSetWriter implements RecordSe
return;
}
+ if (value instanceof java.sql.Time) {
+ final Object formatted = format((java.sql.Time) value,
LAZY_TIME_FORMAT);
+ generator.writeObject(formatted);
+ return;
+ }
+ if (value instanceof java.sql.Date) {
+ final Object formatted = format((java.sql.Date) value,
LAZY_DATE_FORMAT);
+ generator.writeObject(formatted);
+ return;
+ }
+ if (value instanceof java.util.Date) {
+ final Object formatted = format((java.util.Date) value,
LAZY_TIMESTAMP_FORMAT);
+ generator.writeObject(formatted);
+ return;
+ }
+
generator.writeObject(value);
}
+ private Object format(final java.util.Date value, final
Supplier<DateFormat> formatSupplier) {
+ if (value == null) {
+ return null;
+ }
+
+ if (formatSupplier == null) {
+ return value.getTime();
+ }
+ final DateFormat format = formatSupplier.get();
+ if (format == null) {
+ return value.getTime();
+ }
+
+ return format.format(value);
+ }
+
@SuppressWarnings("unchecked")
private void writeValue(final JsonGenerator generator, final Object value,
final String fieldName, final DataType dataType) throws IOException {
if (value == null) {
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index dd25a3b..230a0e3 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -102,18 +102,20 @@ public class TestCSVRecordReader {
fields.add(new RecordField("date",
RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream bais = new
ByteArrayInputStream(text.getBytes());
- final CSVRecordReader reader = new CSVRecordReader(bais,
Mockito.mock(ComponentLog.class), schema, format, true, false,
+ for (final boolean coerceTypes : new boolean[] {true, false}) {
+ try (final InputStream bais = new
ByteArrayInputStream(text.getBytes());
+ final CSVRecordReader reader = new CSVRecordReader(bais,
Mockito.mock(ComponentLog.class), schema, format, true, false,
"MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
- final Record record = reader.nextRecord();
- final java.sql.Date date = (Date) record.getValue("date");
- final Calendar calendar =
Calendar.getInstance(TimeZone.getTimeZone("gmt"));
- calendar.setTimeInMillis(date.getTime());
+ final Record record = reader.nextRecord(coerceTypes, false);
+ final java.sql.Date date = (Date) record.getValue("date");
+ final Calendar calendar =
Calendar.getInstance(TimeZone.getTimeZone("gmt"));
+ calendar.setTimeInMillis(date.getTime());
- assertEquals(1983, calendar.get(Calendar.YEAR));
- assertEquals(10, calendar.get(Calendar.MONTH));
- assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+ assertEquals(1983, calendar.get(Calendar.YEAR));
+ assertEquals(10, calendar.get(Calendar.MONTH));
+ assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
index 5e0da91..d97017c 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
@@ -17,21 +17,7 @@
package org.apache.nifi.json;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.stream.Collectors;
-
+import com.jayway.jsonpath.JsonPath;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -45,7 +31,21 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import com.jayway.jsonpath.JsonPath;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class TestJsonPathRowRecordReader {
private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
@@ -167,6 +167,25 @@ public class TestJsonPathRowRecordReader {
}
}
+ @Test
+ public void testTimestampCoercedFromString() throws IOException,
MalformedRecordException {
+ final List<RecordField> recordFields = Collections.singletonList(new
RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+ final LinkedHashMap<String, JsonPath> jsonPaths = new
LinkedHashMap<>();
+ jsonPaths.put("timestamp", JsonPath.compile("$.timestamp"));
+
+ for (final boolean coerceTypes : new boolean[] {true, false}) {
+ try (final InputStream in = new FileInputStream(new
File("src/test/resources/json/timestamp.json"));
+ final JsonPathRowRecordReader reader = new
JsonPathRowRecordReader(jsonPaths, schema, in,
Mockito.mock(ComponentLog.class), dateFormat, timeFormat, "yyyy/MM/dd
HH:mm:ss")) {
+
+ final Record record = reader.nextRecord(coerceTypes, false);
+ final Object value = record.getValue("timestamp");
+ assertTrue("With coerceTypes set to " + coerceTypes + ", value
is not a Timestamp", value instanceof java.sql.Timestamp);
+ }
+ }
+ }
+
@Test
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index adbdc02..759a5ff 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -407,6 +407,22 @@ public class TestJsonTreeRowRecordReader {
@Test
+ public void testTimestampCoercedFromString() throws IOException,
MalformedRecordException {
+ final List<RecordField> recordFields = Collections.singletonList(new
RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+ for (final boolean coerceTypes : new boolean[] {true, false}) {
+ try (final InputStream in = new FileInputStream(new
File("src/test/resources/json/timestamp.json"));
+ final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema,
dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+
+ final Record record = reader.nextRecord(coerceTypes, false);
+ final Object value = record.getValue("timestamp");
+ assertTrue("With coerceTypes set to " + coerceTypes + ", value
is not a Timestamp", value instanceof java.sql.Timestamp);
+ }
+ }
+ }
+
+ @Test
public void testSingleJsonElement() throws IOException,
MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json
new file mode 100644
index 0000000..ee5d90f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json
@@ -0,0 +1 @@
+{"timestamp": "2019/06/27 13:04:04"}
\ No newline at end of file