http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java index 79c602d..e0eb813 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -23,6 +23,8 @@ import java.io.OutputStreamWriter; import java.util.Collections; import java.util.Optional; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; @@ -31,14 +33,14 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.stream.io.NonCloseableOutputStream; -import au.com.bytecode.opencsv.CSVWriter; - public class WriteCSVResult implements RecordSetWriter { + private final CSVFormat csvFormat; private final String dateFormat; private final String timeFormat; private final String timestampFormat; - public WriteCSVResult(final String dateFormat, final String timeFormat, final String timestampFormat) { + public WriteCSVResult(final CSVFormat csvFormat, final String dateFormat, final String timeFormat, final String timestampFormat) { + this.csvFormat = csvFormat; this.dateFormat = dateFormat; this.timeFormat = timeFormat; this.timestampFormat = timestampFormat; @@ -66,24 +68,25 @@ public class WriteCSVResult implements RecordSetWriter { @Override public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException { int count = 0; + + final RecordSchema schema = rs.getSchema(); + final String[] columnNames = schema.getFieldNames().toArray(new String[0]); + final CSVFormat formatWithHeader = csvFormat.withHeader(columnNames); + try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut); final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable); - final CSVWriter writer = new CSVWriter(streamWriter)) { + final CSVPrinter printer = new CSVPrinter(streamWriter, formatWithHeader)) { try { - final RecordSchema schema = rs.getSchema(); - final String[] columnNames = schema.getFieldNames().toArray(new String[0]); - writer.writeNext(columnNames); - Record record; while ((record = rs.next()) != null) { - final String[] colVals = new String[schema.getFieldCount()]; + final Object[] colVals = new Object[schema.getFieldCount()]; int i = 0; for (final String fieldName : schema.getFieldNames()) { colVals[i++] = record.getAsString(fieldName, getFormat(record, fieldName)); } - writer.writeNext(colVals); + printer.printRecord(colVals); count++; } } catch (final Exception e) { @@ -96,22 +99,20 @@ public class WriteCSVResult implements RecordSetWriter { @Override public WriteResult write(final Record record, final OutputStream rawOut) throws IOException { + try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut); final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable); - final CSVWriter writer = new CSVWriter(streamWriter)) { + final CSVPrinter printer = new CSVPrinter(streamWriter, csvFormat)) { try { final RecordSchema schema = record.getSchema(); - final String[] columnNames = schema.getFieldNames().toArray(new String[0]); - writer.writeNext(columnNames); - - final String[] colVals = new String[schema.getFieldCount()]; + final Object[] colVals = new Object[schema.getFieldCount()]; int i = 0; for (final String fieldName : schema.getFieldNames()) { colVals[i++] = record.getAsString(fieldName, getFormat(record, fieldName)); } - writer.writeNext(colVals); + printer.printRecord(colVals); } catch (final Exception e) { throw new IOException("Failed to serialize results", e); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index f72d5d5..f444b8a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -29,11 +29,13 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RowRecordReaderFactory; -import org.apache.nifi.serialization.UserTypeOverrideRowReader; +import org.apache.nifi.serialization.SchemaRegistryRecordReader; +import org.apache.nifi.serialization.record.RecordSchema; import io.thekraken.grok.api.Grok; import io.thekraken.grok.api.exception.GrokException; @@ -45,9 +47,11 @@ import io.thekraken.grok.api.exception.GrokException; + "If a line in the input does not match the expected message pattern, the line of text is considered to be part of the previous " + "message, with the exception of stack traces. A stack trace that is found at the end of a log message is considered to be part " + "of the previous message but is added to the 'STACK_TRACE' field of the Record. If a record has no stack trace, it will have a NULL value " - + "for the STACK_TRACE field.") -public class GrokReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory { + + "for the STACK_TRACE field. All fields that are parsed are considered to be of type String by default. If there is need to change the type of a field, " + + "this can be accomplished by configuring the Schema Registry to use and adding the appropriate schema.") +public class GrokReader extends SchemaRegistryRecordReader implements RowRecordReaderFactory { private volatile Grok grok; + private volatile boolean useSchemaRegistry; private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt"; @@ -60,6 +64,7 @@ public class GrokReader extends UserTypeOverrideRowReader implements RowRecordRe .expressionLanguageSupported(true) .required(false) .build(); + static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() .name("Grok Expression") .description("Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. " @@ -70,7 +75,7 @@ public class GrokReader extends UserTypeOverrideRowReader implements RowRecordRe @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); properties.add(PATTERN_FILE); properties.add(GROK_EXPRESSION); return properties; @@ -86,14 +91,21 @@ public class GrokReader extends UserTypeOverrideRowReader implements RowRecordRe } if (context.getProperty(PATTERN_FILE).isSet()) { - grok.addPatternFromFile(context.getProperty(PATTERN_FILE).getValue()); + grok.addPatternFromFile(context.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue()); } grok.compile(context.getProperty(GROK_EXPRESSION).getValue()); + useSchemaRegistry = context.getProperty(OPTIONAL_SCHEMA_NAME).isSet() && context.getProperty(OPTIONAL_SCHEMA_REGISTRY).isSet(); + } + + @Override + protected boolean isSchemaRequired() { + return false; } @Override - public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException { - return new GrokRecordReader(in, grok, getFieldTypeOverrides()); + public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException { + final RecordSchema schema = useSchemaRegistry ? getSchema(flowFile) : null; + return new GrokRecordReader(in, grok, schema); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java index bdf12f9..458dbd8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java @@ -50,10 +50,9 @@ import io.thekraken.grok.api.Match; public class GrokRecordReader implements RecordReader { private final BufferedReader reader; private final Grok grok; - private final Map<String, DataType> fieldTypeOverrides; + private RecordSchema schema; private String nextLine; - private RecordSchema schema; static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE"; private static final Pattern STACK_TRACE_PATTERN = Pattern.compile( @@ -74,10 +73,10 @@ public class GrokRecordReader implements RecordReader { TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt); } - public GrokRecordReader(final InputStream in, final Grok grok, final Map<String, DataType> fieldTypeOverrides) { + public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema) { this.reader = new BufferedReader(new InputStreamReader(in)); this.grok = grok; - this.fieldTypeOverrides = fieldTypeOverrides; + this.schema = schema; } @Override @@ -210,46 +209,33 @@ public class GrokRecordReader implements RecordReader { if (fieldType == null) { return string; } + + if (string == null) { + return null; + } + + // If string is empty then return an empty string if field type is STRING. If field type is + // anything else, we can't really convert it so return null + if (string.isEmpty() && fieldType.getFieldType() != RecordFieldType.STRING) { + return null; + } + switch (fieldType.getFieldType()) { case BOOLEAN: - if (string.length() == 0) { - return null; - } return Boolean.parseBoolean(string); case BYTE: - if (string.length() == 0) { - return null; - } return Byte.parseByte(string); case SHORT: - if (string.length() == 0) { - return null; - } return Short.parseShort(string); case INT: - if (string.length() == 0) { - return null; - } return Integer.parseInt(string); case LONG: - if (string.length() == 0) { - return null; - } return Long.parseLong(string); case FLOAT: - if (string.length() == 0) { - return null; - } return Float.parseFloat(string); case DOUBLE: - if (string.length() == 0) { - return null; - } return Double.parseDouble(string); case DATE: - if (string.length() == 0) { - return null; - } try { Date date = TIME_FORMAT_DATE.parse(string); return new java.sql.Date(date.getTime()); @@ -257,9 +243,6 @@ public class GrokRecordReader implements RecordReader { return null; } case TIME: - if (string.length() == 0) { - return null; - } try { Date date = TIME_FORMAT_TIME.parse(string); return new java.sql.Time(date.getTime()); @@ -267,9 +250,6 @@ public class GrokRecordReader implements RecordReader { return null; } case TIMESTAMP: - if (string.length() == 0) { - return null; - } try { Date date = TIME_FORMAT_TIMESTAMP.parse(string); return new java.sql.Timestamp(date.getTime()); @@ -298,11 +278,7 @@ public class GrokRecordReader implements RecordReader { final Map<String, String> namedGroups = GrokUtils.namedGroups(matcher, grokExpression); final String fieldName = namedGroups.get("subname"); - DataType dataType = fieldTypeOverrides.get(fieldName); - if (dataType == null) { - dataType = RecordFieldType.STRING.getDataType(); - } - + DataType dataType = RecordFieldType.STRING.getDataType(); final RecordField recordField = new RecordField(fieldName, dataType); fields.add(recordField); http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index 286326a..ad04912 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -19,16 +19,11 @@ package org.apache.nifi.json; import java.io.IOException; import java.io.InputStream; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.TimeZone; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; @@ -45,7 +40,6 @@ import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonToken; import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.ArrayNode; public abstract class AbstractJsonRowRecordReader implements RecordReader { @@ -57,8 +51,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { private boolean firstObjectConsumed = false; - private static final TimeZone gmt = TimeZone.getTimeZone("GMT"); - public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException { this.logger = logger; @@ -136,7 +128,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { } final RecordSchema childSchema = determineSchema(node); - return RecordFieldType.RECORD.getDataType(childSchema); + return RecordFieldType.RECORD.getRecordDataType(childSchema); } protected RecordSchema determineSchema(final JsonNode jsonNode) { @@ -155,111 +147,31 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { return new SimpleRecordSchema(recordFields); } - protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType) throws IOException, MalformedRecordException { - if (fieldNode == null || fieldNode.isNull()) { + protected Object getRawNodeValue(final JsonNode fieldNode) throws IOException { + if (fieldNode == null || !fieldNode.isValueNode()) { return null; } - switch (desiredType.getFieldType()) { - case BOOLEAN: - return fieldNode.asBoolean(); - case BYTE: - return (byte) fieldNode.asInt(); - case CHAR: - final String text = fieldNode.asText(); - if (text.isEmpty()) { - return null; - } - return text.charAt(0); - case DOUBLE: - return fieldNode.asDouble(); - case FLOAT: - return (float) fieldNode.asDouble(); - case INT: - return fieldNode.asInt(); - case LONG: - return fieldNode.asLong(); - case SHORT: - return (short) fieldNode.asInt(); - case STRING: - return fieldNode.asText(); - case DATE: { - final String string = fieldNode.asText(); - if (string.isEmpty()) { - return null; - } - - try { - final DateFormat dateFormat = new SimpleDateFormat(desiredType.getFormat()); - dateFormat.setTimeZone(gmt); - final Date date = dateFormat.parse(string); - return new java.sql.Date(date.getTime()); - } catch (ParseException e) { - logger.warn("Failed to convert JSON field to Date for field {} (value {})", new Object[] {fieldName, string, e}); - return null; - } - } - case TIME: { - final String string = fieldNode.asText(); - if (string.isEmpty()) { - return null; - } - - try { - final DateFormat dateFormat = new SimpleDateFormat(desiredType.getFormat()); - dateFormat.setTimeZone(gmt); - final Date date = dateFormat.parse(string); - return new java.sql.Date(date.getTime()); - } catch (ParseException e) { - logger.warn("Failed to convert JSON field to Time for field {} (value {})", new Object[] {fieldName, string, e}); - return null; - } - } - case TIMESTAMP: { - final String string = fieldNode.asText(); - if (string.isEmpty()) { - return null; - } - - try { - final DateFormat dateFormat = new SimpleDateFormat(desiredType.getFormat()); - dateFormat.setTimeZone(gmt); - final Date date = dateFormat.parse(string); - return new java.sql.Date(date.getTime()); - } catch (ParseException e) { - logger.warn("Failed to convert JSON field to Timestamp for field {} (value {})", new Object[] {fieldName, string, e}); - return null; - } - } - case ARRAY: { - final ArrayNode arrayNode = (ArrayNode) fieldNode; - final int numElements = arrayNode.size(); - final Object[] arrayElements = new Object[numElements]; - int count = 0; - for (final JsonNode node : arrayNode) { - final Object converted = convertField(node, fieldName, determineFieldType(node)); - arrayElements[count++] = converted; - } - - return arrayElements; - } - case RECORD: { - if (fieldNode.isObject()) { - final Optional<RecordSchema> childSchema = desiredType.getChildRecordSchema(); - if (!childSchema.isPresent()) { - return null; - } - - return convertJsonNodeToRecord(fieldNode, childSchema.get()); - } else { - return fieldNode.toString(); - } - } + if (fieldNode.isNumber()) { + return fieldNode.getNumberValue(); + } + + if (fieldNode.isBinary()) { + return fieldNode.getBinaryValue(); + } + + if (fieldNode.isBoolean()) { + return fieldNode.getBooleanValue(); + } + + if (fieldNode.isTextual()) { + return fieldNode.getTextValue(); } - return fieldNode.toString(); + return null; } + private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException { if (!firstObjectConsumed) { firstObjectConsumed = true; @@ -286,6 +198,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { } } + @Override public void close() throws IOException { jsonParser.close(); http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java index b43b1c1..467ecf8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java @@ -19,12 +19,11 @@ package org.apache.nifi.json; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; +import java.util.List; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -34,13 +33,15 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.DateTimeUtils; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RowRecordReaderFactory; -import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.SchemaRegistryRecordReader; +import org.apache.nifi.serialization.record.RecordSchema; import com.jayway.jsonpath.JsonPath; @@ -50,16 +51,27 @@ import com.jayway.jsonpath.JsonPath; + "User-defined properties define the fields that should be extracted from the JSON in order to form the fields of a Record. Any JSON field " + "that is not extracted via a JSONPath will not be returned in the JSON Records.") @SeeAlso(JsonTreeReader.class) -@DynamicProperty(name = "The field name for the record. If it is desirable to enforce that the value be coerced into a given type, its type can be included " - + "in the name by using a syntax of <field name>:<field type>. For example, \"balance:double\".", +@DynamicProperty(name = "The field name for the record.", value="A JSONPath Expression that will be evaluated against each JSON record. The result of the JSONPath will be the value of the " + "field whose name is the same as the property name.", description="User-defined properties identifiy how to extract specific fields from a JSON object in order to create a Record", supportsExpressionLanguage=false) -public class JsonPathReader extends AbstractControllerService implements RowRecordReaderFactory { +public class JsonPathReader extends SchemaRegistryRecordReader implements RowRecordReaderFactory { + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; private volatile LinkedHashMap<String, JsonPath> jsonPaths; - private volatile Map<String, DataType> fieldTypeOverrides; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(DateTimeUtils.DATE_FORMAT); + properties.add(DateTimeUtils.TIME_FORMAT); + properties.add(DateTimeUtils.TIMESTAMP_FORMAT); + return properties; + } + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { @@ -74,7 +86,9 @@ public class JsonPathReader extends AbstractControllerService implements RowReco @OnEnabled public void compileJsonPaths(final ConfigurationContext context) { - final Map<String, DataType> fieldTypes = new HashMap<>(context.getProperties().size()); + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); final LinkedHashMap<String, JsonPath> compiled = new LinkedHashMap<>(); for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { @@ -82,19 +96,13 @@ public class JsonPathReader extends AbstractControllerService implements RowReco continue; } - final String fieldName = PropertyNameUtil.getFieldName(descriptor.getName()); - final Optional<DataType> dataTypeOption = PropertyNameUtil.getDataType(descriptor.getName()); - if (dataTypeOption.isPresent()) { - fieldTypes.put(fieldName, dataTypeOption.get()); - } - + final String fieldName = descriptor.getName(); final String expression = context.getProperty(descriptor).getValue(); final JsonPath jsonPath = JsonPath.compile(expression); compiled.put(fieldName, jsonPath); } jsonPaths = compiled; - fieldTypeOverrides = fieldTypes; } @Override @@ -119,8 +127,9 @@ public class JsonPathReader extends AbstractControllerService implements RowReco } @Override - public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException { - return new JsonPathRowRecordReader(jsonPaths, fieldTypeOverrides, in, logger); + public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException { + final RecordSchema schema = getSchema(flowFile); + return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java index 9654b97..a0f3c32 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java @@ -19,30 +19,23 @@ package org.apache.nifi.json; import java.io.IOException; import java.io.InputStream; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.TimeZone; -import java.util.stream.Collectors; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.serialization.DataTypeUtils; import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import org.codehaus.jackson.JsonNode; import com.jayway.jsonpath.Configuration; @@ -54,23 +47,27 @@ import com.jayway.jsonpath.spi.json.JacksonJsonProvider; public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build(); - private static final String TIME_FORMAT_DATE = "yyyy-MM-dd"; - private static final String TIME_FORMAT_TIME = "HH:mm:ss"; - private static final String TIME_FORMAT_TIMESTAMP = "yyyy-MM-dd HH:mm:ss"; - private static final TimeZone gmt = TimeZone.getTimeZone("GMT"); - + private final ComponentLog logger; private final LinkedHashMap<String, JsonPath> jsonPaths; - private final Map<String, DataType> fieldTypeOverrides; private final InputStream in; private RecordSchema schema; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; - public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final Map<String, DataType> fieldTypeOverrides, final InputStream in, final ComponentLog logger) + public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger, + final String dateFormat, final String timeFormat, final String timestampFormat) throws MalformedRecordException, IOException { super(in, logger); + this.dateFormat = dateFormat; + this.timeFormat = timeFormat; + this.timestampFormat = timestampFormat; + + this.schema = schema; this.jsonPaths = jsonPaths; - this.fieldTypeOverrides = fieldTypeOverrides; this.in = in; + this.logger = logger; } @Override @@ -80,60 +77,10 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { @Override public RecordSchema getSchema() { - if (schema != null) { - return schema; - } - - final Optional<JsonNode> firstNodeOption = getFirstJsonNode(); - - final List<RecordField> recordFields = new ArrayList<>(); - if (firstNodeOption.isPresent()) { - final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(firstNodeOption.get().toString()); - for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) { - final String fieldName = PropertyNameUtil.getFieldName(entry.getKey()); - final JsonPath jsonPath = entry.getValue(); - - final DataType dataType; - final DataType dataTypeOverride = fieldTypeOverrides.get(fieldName); - if (dataTypeOverride == null) { - Object value; - try { - value = ctx.read(jsonPath); - } catch (final PathNotFoundException pnfe) { - value = null; - } - - if (value == null) { - dataType = RecordFieldType.STRING.getDataType(); - } else { - dataType = DataTypeUtils.inferDataType(value); - } - } else { - dataType = dataTypeOverride; - } - - recordFields.add(new RecordField(fieldName, dataType)); - } - } - - // If there are any overridden field types that we didn't find, add as the last fields. - final Set<String> knownFieldNames = recordFields.stream() - .map(f -> f.getFieldName()) - .collect(Collectors.toSet()); - - for (final Map.Entry<String, DataType> entry : fieldTypeOverrides.entrySet()) { - if (!knownFieldNames.contains(entry.getKey())) { - recordFields.add(new RecordField(entry.getKey(), entry.getValue())); - } - } - - schema = new SimpleRecordSchema(recordFields); return schema; } - @Override - @SuppressWarnings("unchecked") protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException { if (jsonNode == null) { return null; @@ -143,138 +90,72 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) { + final String fieldName = entry.getKey(); + final DataType desiredType = schema.getDataType(fieldName).orElse(null); + if (desiredType == null) { + continue; + } + final JsonPath jsonPath = entry.getValue(); Object value; try { value = ctx.read(jsonPath); } catch (final PathNotFoundException pnfe) { + logger.debug("Evaluated JSONPath Expression {} but the path was not found; will use a null value", new Object[] {entry.getValue()}); value = null; } - final String fieldName = entry.getKey(); - if (value != null) { - final DataType determinedType = DataTypeUtils.inferDataType(value); - final DataType desiredType = schema.getDataType(fieldName).orElse(null); - - if (value instanceof List) { - value = ((List<Object>) value).toArray(); - } else if (value instanceof Map && desiredType.getFieldType() == RecordFieldType.RECORD) { - value = convert(desiredType, value); - } else if (desiredType != null && !determinedType.equals(desiredType) && shouldConvert(value, determinedType.getFieldType())) { - value = convert(desiredType, value); - } - } - + value = convert(value, desiredType); values.put(fieldName, value); } return new MapRecord(schema, values); } - private boolean shouldConvert(final Object value, final RecordFieldType determinedType) { - return determinedType != null - && determinedType != RecordFieldType.ARRAY; - } - - protected Object convert(final DataType dataType, final Object value) { - if (dataType.getFieldType() == RecordFieldType.RECORD && dataType.getChildRecordSchema().isPresent() && value instanceof Map) { - @SuppressWarnings("unchecked") - final Map<String, Object> map = (Map<String, Object>) value; - return new MapRecord(dataType.getChildRecordSchema().get(), map); - } else { - return convertString(dataType, value.toString()); + @SuppressWarnings("unchecked") + protected Object convert(final Object value, final DataType dataType) { + if (value == null) { + return null; } - } - /** - * Coerces the given string into the provided data type, if possible - * - * @param dataType the desired type - * @param string the string representation of the value - * @return an Object representing the same value as the given string but in the requested data type - */ - protected Object convertString(final DataType dataType, final String string) { - if (dataType == null) { - return string; + if (value instanceof List) { + if (dataType.getFieldType() != RecordFieldType.ARRAY) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type Array to " + dataType); + } + + final ArrayDataType arrayType = (ArrayDataType) dataType; + + final List<?> list = (List<?>) value; + final Object[] coercedValues = new Object[list.size()]; + int i = 0; + for (final Object rawValue : list) { + coercedValues[i++] = DataTypeUtils.convertType(rawValue, arrayType.getElementType(), dateFormat, timeFormat, timestampFormat); + } + return coercedValues; } - switch (dataType.getFieldType()) { - case BOOLEAN: - if (string.length() == 0) { - return null; - } - return Boolean.parseBoolean(string); - case BYTE: - if (string.length() == 0) { - return null; - } - return Byte.parseByte(string); - case SHORT: - if (string.length() == 0) { - return null; - } - return Short.parseShort(string); - case INT: - if (string.length() == 0) { - return null; - } - return Integer.parseInt(string); - case LONG: - if (string.length() == 0) { - return null; - } - return Long.parseLong(string); - case FLOAT: - if (string.length() == 0) { - return null; - } - return Float.parseFloat(string); - case DOUBLE: - if (string.length() == 0) { - return null; - } - return Double.parseDouble(string); - case DATE: - if (string.length() == 0) { - return null; - } - try { - final DateFormat format = new SimpleDateFormat(TIME_FORMAT_DATE); - format.setTimeZone(gmt); - Date date = format.parse(string); - return new java.sql.Date(date.getTime()); - } catch (ParseException e) { - return null; - } - case TIME: - if (string.length() == 0) { - return null; - } - try { - final DateFormat format = new SimpleDateFormat(TIME_FORMAT_TIME); - format.setTimeZone(gmt); - Date date = format.parse(string); - return new java.sql.Time(date.getTime()); - } catch (ParseException e) { - return null; - } - case TIMESTAMP: - if (string.length() == 0) { - return null; - } - try { - final DateFormat format = new SimpleDateFormat(TIME_FORMAT_TIMESTAMP); - format.setTimeZone(gmt); - Date date = format.parse(string); - return new java.sql.Timestamp(date.getTime()); - } catch (ParseException e) { - return null; + if (dataType.getFieldType() == RecordFieldType.RECORD && value instanceof Map) { + final RecordDataType recordDataType = (RecordDataType) dataType; + final RecordSchema childSchema = recordDataType.getChildSchema(); + + final Map<String, Object> rawValues = (Map<String, Object>) value; + final Map<String, Object> coercedValues = new HashMap<>(); + + for (final Map.Entry<String, Object> entry : rawValues.entrySet()) { + final String key = entry.getKey(); + final Optional<DataType> desiredTypeOption = childSchema.getDataType(key); + if (desiredTypeOption.isPresent()) { + final Object coercedValue = DataTypeUtils.convertType(entry.getValue(), desiredTypeOption.get(), dateFormat, timeFormat, timestampFormat); + coercedValues.put(key, coercedValue); } - case STRING: - default: - return string; + } + + return new MapRecord(childSchema, coercedValues); + } else { + return DataTypeUtils.convertType(value, dataType, dateFormat, timeFormat, timestampFormat); } } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java index 626f56c..f28c43d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java @@ -27,18 +27,6 @@ public class JsonPathValidator implements Validator { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if (PropertyNameUtil.hasFieldType(subject) && !PropertyNameUtil.isFieldTypeValid(subject)) { - final String fieldType = PropertyNameUtil.getFieldTypeName(subject).get(); - - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(false) - .explanation("Invalid field type. If property name contains a colon (:) it must use syntax of " - + "<field name>:<field type> but the specified field type ('" + fieldType + "') is not a valid field type") - .build(); - } - try { JsonPath.compile(input); } catch (final Exception e) { http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java index dc75a51..d09f135 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -26,14 +26,14 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.serialization.AbstractRecordSetWriter; +import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -@Tags({"json", "resultset", "writer", "serialize", "record", "row"}) -@CapabilityDescription("Writes the results of a Database ResultSet as a JSON Array. Even if the ResultSet " +@Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"}) +@CapabilityDescription("Writes the results of a RecordSet as a JSON Array. Even if the RecordSet " + "consists of a single row, it will be written as an array with a single element.") -public class JsonRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory { +public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory { static final PropertyDescriptor PRETTY_PRINT_JSON = new PropertyDescriptor.Builder() .name("Pretty Print JSON") http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index 2d7072a..1abb1f4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -19,38 +19,56 @@ package org.apache.nifi.json; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; -import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.DateTimeUtils; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RowRecordReaderFactory; -import org.apache.nifi.serialization.UserTypeOverrideRowReader; +import org.apache.nifi.serialization.SchemaRegistryRecordReader; @Tags({"json", "tree", "record", "reader", "parser"}) @CapabilityDescription("Parses JSON into individual Record objects. The Record that is produced will contain all top-level " - + "elements of the corresponding JSON Object. If the JSON has nested arrays, those values will be represented as an Object array for that field. " - + "Nested JSON objects will be represented as a Map. " + + "elements of the corresponding JSON Object. " + "The root JSON element can be either a single element or an array of JSON elements, and each " - + "element in that array will be treated as a separate record. If any of the elements has a nested array or a nested " - + "element, they will be returned as OBJECT or ARRAY types (respectively), not flattened out into individual fields. " - + "The schema for the record is determined by the first JSON element in the array, if the incoming FlowFile is a JSON array. " - + "This means that if a field does not exist in the first JSON object, then it will be skipped in all subsequent JSON objects. " - + "The data type of a field can be overridden by adding a property to " - + "the controller service where the name of the property matches the JSON field name and the value of the property is " - + "the data type to use. If that field does not exist in a JSON element, the field will be assumed to be null. " - + "See the Usage of the Controller Service for more information.") + + "element in that array will be treated as a separate record. " + + "If the schema that is configured contains a field that is not present in the JSON, a null value will be used. If the JSON contains " + + "a field that is not present in the schema, that field will be skipped. " + + "See the Usage of the Controller Service for more information and examples.") @SeeAlso(JsonPathReader.class) -@DynamicProperty(name = "<name of JSON field>", value = "<data type of JSON field>", - description = "User-defined properties are used to indicate that the values of a specific field should be interpreted as a " - + "user-defined data type (e.g., int, double, float, date, etc.)", supportsExpressionLanguage = false) -public class JsonTreeReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory { +public class JsonTreeReader extends SchemaRegistryRecordReader implements RowRecordReaderFactory { + + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(DateTimeUtils.DATE_FORMAT); + properties.add(DateTimeUtils.TIME_FORMAT); + properties.add(DateTimeUtils.TIMESTAMP_FORMAT); + return properties; + } + + @OnEnabled + public void storeFormats(final ConfigurationContext context) { + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } @Override - public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException { - return new JsonTreeRowRecordReader(in, logger, getFieldTypeOverrides()); + public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException { + return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile), dateFormat, timeFormat, timestampFormat); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index 4a2d212..c8d07f4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -19,35 +19,40 @@ package org.apache.nifi.json; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { - private final Map<String, DataType> fieldTypeOverrides; - private RecordSchema schema; + private final RecordSchema schema; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; - public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final Map<String, DataType> fieldTypeOverrides) throws IOException, MalformedRecordException { + public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, + final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException { super(in, logger); - this.fieldTypeOverrides = fieldTypeOverrides; + this.schema = schema; + + this.dateFormat = dateFormat; + this.timeFormat = timeFormat; + this.timestampFormat = timestampFormat; } + @Override protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException, MalformedRecordException { if (jsonNode == null) { @@ -68,48 +73,76 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { return new MapRecord(schema, values); } - - @Override - public RecordSchema getSchema() { - if (schema != null) { - return schema; + protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType) throws IOException, MalformedRecordException { + if (fieldNode == null || fieldNode.isNull()) { + return null; } - final List<RecordField> recordFields = new ArrayList<>(); - final Optional<JsonNode> firstNodeOption = getFirstJsonNode(); - - if (firstNodeOption.isPresent()) { - final Iterator<Map.Entry<String, JsonNode>> itr = firstNodeOption.get().getFields(); - while (itr.hasNext()) { - final Map.Entry<String, JsonNode> entry = itr.next(); - final String elementName = entry.getKey(); - final JsonNode node = entry.getValue(); + switch (desiredType.getFieldType()) { + case BOOLEAN: + return DataTypeUtils.toBoolean(getRawNodeValue(fieldNode)); + case BYTE: + return DataTypeUtils.toByte(getRawNodeValue(fieldNode)); + case CHAR: + return DataTypeUtils.toCharacter(getRawNodeValue(fieldNode)); + case DOUBLE: + return DataTypeUtils.toDouble(getRawNodeValue(fieldNode)); + case FLOAT: + return DataTypeUtils.toFloat(getRawNodeValue(fieldNode)); + case INT: + return DataTypeUtils.toInteger(getRawNodeValue(fieldNode)); + case LONG: + return DataTypeUtils.toLong(getRawNodeValue(fieldNode)); + case SHORT: + return DataTypeUtils.toShort(getRawNodeValue(fieldNode)); + case STRING: + return DataTypeUtils.toString(getRawNodeValue(fieldNode), dateFormat, timeFormat, timestampFormat); + case DATE: + return DataTypeUtils.toDate(getRawNodeValue(fieldNode), dateFormat); + case TIME: + return DataTypeUtils.toTime(getRawNodeValue(fieldNode), timeFormat); + case TIMESTAMP: + return DataTypeUtils.toTimestamp(getRawNodeValue(fieldNode), timestampFormat); + case ARRAY: { + final ArrayNode arrayNode = (ArrayNode) fieldNode; + final int numElements = arrayNode.size(); + final Object[] arrayElements = new Object[numElements]; + int count = 0; + for (final JsonNode node : arrayNode) { + final DataType elementType; + if (desiredType instanceof ArrayDataType) { + elementType = ((ArrayDataType) desiredType).getElementType(); + } else { + elementType = determineFieldType(node); + } + + final Object converted = convertField(node, fieldName, elementType); + arrayElements[count++] = converted; + } - DataType dataType; - final DataType overriddenDataType = fieldTypeOverrides.get(elementName); - if (overriddenDataType == null) { - dataType = determineFieldType(node); + return arrayElements; + } + case RECORD: { + if (fieldNode.isObject()) { + final RecordSchema childSchema; + if (desiredType instanceof RecordDataType) { + childSchema = ((RecordDataType) desiredType).getChildSchema(); + } else { + return null; + } + + return convertJsonNodeToRecord(fieldNode, childSchema); } else { - dataType = overriddenDataType; + return null; } - - recordFields.add(new RecordField(elementName, dataType)); } } - // If there are any overridden field types that we didn't find, add as the last fields. - final Set<String> knownFieldNames = recordFields.stream() - .map(f -> f.getFieldName()) - .collect(Collectors.toSet()); - - for (final Map.Entry<String, DataType> entry : fieldTypeOverrides.entrySet()) { - if (!knownFieldNames.contains(entry.getKey())) { - recordFields.add(new RecordField(entry.getKey(), entry.getValue())); - } - } + return null; + } - schema = new SimpleRecordSchema(recordFields); + @Override + public RecordSchema getSchema() { return schema; } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java deleted file mode 100644 index 3b7dcf9..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.json; - -import java.util.Optional; - -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordFieldType; - -public class PropertyNameUtil { - - public static String getFieldName(final String propertyName) { - final int colonIndex = propertyName.indexOf(":"); - if (colonIndex > -1 && colonIndex < propertyName.length() - 1) { - return propertyName.substring(0, colonIndex); - } - - return propertyName; - } - - public static boolean hasFieldType(final String propertyName) { - final int colonIndex = propertyName.indexOf(":"); - return (colonIndex > -1 && colonIndex < propertyName.length() - 1); - } - - public static Optional<String> getFieldTypeName(final String propertyName) { - if (hasFieldType(propertyName)) { - final String[] splits = propertyName.split("\\:"); - if (splits.length > 1) { - return Optional.of(splits[1]); - } - return Optional.empty(); - } - - return Optional.empty(); - } - - public static Optional<String> getFieldFormat(final String propertyName) { - final String[] splits = propertyName.split("\\:"); - if (splits.length != 3) { - return Optional.empty(); - } - - return Optional.of(splits[2]); - } - - public static boolean isFieldTypeValid(final String propertyName) { - final Optional<String> fieldType = getFieldTypeName(propertyName); - if (!fieldType.isPresent()) { - return false; - } - - final String typeName = fieldType.get(); - final RecordFieldType recordFieldType = RecordFieldType.of(typeName); - return recordFieldType != null; - } - - public static Optional<DataType> getDataType(final String propertyName) { - if (isFieldTypeValid(propertyName)) { - final String typeName = getFieldTypeName(propertyName).get(); - final RecordFieldType fieldType = RecordFieldType.of(typeName); - - final Optional<String> format = getFieldFormat(propertyName); - if (format.isPresent()) { - return Optional.of(fieldType.getDataType(format.get())); - } else { - return Optional.of(fieldType.getDataType()); - } - } - - return Optional.empty(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index cf72b19..05895d8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -20,17 +20,10 @@ package org.apache.nifi.json; import java.io.IOException; import java.io.OutputStream; import java.math.BigInteger; -import java.sql.Array; import java.sql.SQLException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.serialization.DataTypeUtils; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; @@ -38,25 +31,29 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.stream.io.NonCloseableOutputStream; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; public class WriteJsonResult implements RecordSetWriter { - private final boolean prettyPrint; - private final ComponentLog logger; + private final boolean prettyPrint; private final JsonFactory factory = new JsonFactory(); - private final DateFormat dateFormat; - private final DateFormat timeFormat; - private final DateFormat timestampFormat; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; public WriteJsonResult(final ComponentLog logger, final boolean prettyPrint, final String dateFormat, final String timeFormat, final String timestampFormat) { this.prettyPrint = prettyPrint; - this.dateFormat = new SimpleDateFormat(dateFormat); - this.timeFormat = new SimpleDateFormat(timeFormat); - this.timestampFormat = new SimpleDateFormat(timestampFormat); + + this.dateFormat = dateFormat; + this.timeFormat = timeFormat; + this.timestampFormat = timestampFormat; + this.logger = logger; } @@ -127,26 +124,6 @@ public class WriteJsonResult implements RecordSetWriter { } } - private String createDate(final Object value, final DateFormat format) { - if (value == null) { - return null; - } - - if (value instanceof Date) { - return format.format((Date) value); - } - if (value instanceof java.sql.Date) { - return format.format(new Date(((java.sql.Date) value).getTime())); - } - if (value instanceof java.sql.Time) { - return format.format(new Date(((java.sql.Time) value).getTime())); - } - if (value instanceof java.sql.Timestamp) { - return format.format(new Date(((java.sql.Timestamp) value).getTime())); - } - - return null; - } private void writeValue(final JsonGenerator generator, final Object value, final DataType dataType, final boolean moreCols) throws JsonGenerationException, IOException, SQLException { @@ -155,50 +132,46 @@ public class WriteJsonResult implements RecordSetWriter { return; } - final DataType resolvedDataType; - if (dataType.getFieldType() == RecordFieldType.CHOICE) { - resolvedDataType = DataTypeUtils.inferDataType(value); - } else { - resolvedDataType = dataType; + final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; + final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType); + if (coercedValue == null) { + generator.writeNull(); + return; } - switch (resolvedDataType.getFieldType()) { + switch (chosenDataType.getFieldType()) { case DATE: - generator.writeString(createDate(value, dateFormat)); - break; case TIME: - generator.writeString(createDate(value, timeFormat)); - break; case TIMESTAMP: - generator.writeString(createDate(value, timestampFormat)); + generator.writeString(DataTypeUtils.toString(coercedValue, dateFormat, timeFormat, timestampFormat)); break; case DOUBLE: - generator.writeNumber(DataTypeUtils.toDouble(value, 0D)); + generator.writeNumber(DataTypeUtils.toDouble(coercedValue)); break; case FLOAT: - generator.writeNumber(DataTypeUtils.toFloat(value, 0F)); + generator.writeNumber(DataTypeUtils.toFloat(coercedValue)); break; case LONG: - generator.writeNumber(DataTypeUtils.toLong(value, 0L)); + generator.writeNumber(DataTypeUtils.toLong(coercedValue)); break; case INT: case BYTE: case SHORT: - generator.writeNumber(DataTypeUtils.toInteger(value, 0)); + generator.writeNumber(DataTypeUtils.toInteger(coercedValue)); break; case CHAR: case STRING: - generator.writeString(value.toString()); + generator.writeString(coercedValue.toString()); break; case BIGINT: - if (value instanceof Long) { - generator.writeNumber(((Long) value).longValue()); + if (coercedValue instanceof Long) { + generator.writeNumber(((Long) coercedValue).longValue()); } else { - generator.writeNumber((BigInteger) value); + generator.writeNumber((BigInteger) coercedValue); } break; case BOOLEAN: - final String stringValue = value.toString(); + final String stringValue = coercedValue.toString(); if ("true".equalsIgnoreCase(stringValue)) { generator.writeBoolean(true); } else if ("false".equalsIgnoreCase(stringValue)) { @@ -208,95 +181,34 @@ public class WriteJsonResult implements RecordSetWriter { } break; case RECORD: { - final Record record = (Record) value; + final Record record = (Record) coercedValue; writeRecord(record, generator, gen -> gen.writeStartObject(), gen -> gen.writeEndObject()); break; } case ARRAY: default: - if ("null".equals(value.toString())) { - generator.writeNull(); - } else if (value instanceof Map) { - final Map<?, ?> map = (Map<?, ?>) value; - generator.writeStartObject(); - - int i = 0; - for (final Map.Entry<?, ?> entry : map.entrySet()) { - generator.writeFieldName(entry.getKey().toString()); - final boolean moreEntries = ++i < map.size(); - writeValue(generator, entry.getValue(), getColType(entry.getValue()), moreEntries); - } - generator.writeEndObject(); - } else if (value instanceof List) { - final List<?> list = (List<?>) value; - writeArray(list.toArray(), generator); - } else if (value instanceof Array) { - final Array array = (Array) value; - final Object[] values = (Object[]) array.getArray(); - writeArray(values, generator); - } else if (value instanceof Object[]) { - final Object[] values = (Object[]) value; - writeArray(values, generator); + if (coercedValue instanceof Object[]) { + final Object[] values = (Object[]) coercedValue; + final ArrayDataType arrayDataType = (ArrayDataType) dataType; + final DataType elementType = arrayDataType.getElementType(); + writeArray(values, generator, elementType); } else { - generator.writeString(value.toString()); + generator.writeString(coercedValue.toString()); } break; } } - private void writeArray(final Object[] values, final JsonGenerator generator) throws JsonGenerationException, IOException, SQLException { + private void writeArray(final Object[] values, final JsonGenerator generator, final DataType elementType) throws JsonGenerationException, IOException, SQLException { generator.writeStartArray(); for (int i = 0; i < values.length; i++) { final boolean moreEntries = i < values.length - 1; final Object element = values[i]; - writeValue(generator, element, getColType(element), moreEntries); + writeValue(generator, element, elementType, moreEntries); } generator.writeEndArray(); } - private DataType getColType(final Object value) { - if (value instanceof String) { - return RecordFieldType.STRING.getDataType(); - } - if (value instanceof Double) { - return RecordFieldType.DOUBLE.getDataType(); - } - if (value instanceof Float) { - return RecordFieldType.FLOAT.getDataType(); - } - if (value instanceof Integer) { - return RecordFieldType.INT.getDataType(); - } - if (value instanceof Long) { - return RecordFieldType.LONG.getDataType(); - } - if (value instanceof BigInteger) { - return RecordFieldType.BIGINT.getDataType(); - } - if (value instanceof Boolean) { - return RecordFieldType.BOOLEAN.getDataType(); - } - if (value instanceof Byte || value instanceof Short) { - return RecordFieldType.INT.getDataType(); - } - if (value instanceof Character) { - return RecordFieldType.STRING.getDataType(); - } - if (value instanceof java.util.Date || value instanceof java.sql.Date) { - return RecordFieldType.DATE.getDataType(); - } - if (value instanceof java.sql.Time) { - return RecordFieldType.TIME.getDataType(); - } - if (value instanceof java.sql.Timestamp) { - return RecordFieldType.TIMESTAMP.getDataType(); - } - if (value instanceof Object[] || value instanceof List || value instanceof Array) { - return RecordFieldType.ARRAY.getDataType(); - } - - return RecordFieldType.RECORD.getDataType(); - } @Override public String getMimeType() { http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java deleted file mode 100644 index b58a22e..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization; - -import java.util.Arrays; -import java.util.List; - -import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.serialization.record.RecordFieldType; - -public abstract class AbstractRecordSetWriter extends AbstractControllerService { - static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() - .name("Date Format") - .description("Specifies the format to use when writing out Date fields") - .expressionLanguageSupported(false) - .defaultValue(RecordFieldType.DATE.getDefaultFormat()) - .addValidator(new SimpleDateFormatValidator()) - .required(true) - .build(); - - static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder() - .name("Time Format") - .description("Specifies the format to use when writing out Time fields") - .expressionLanguageSupported(false) - .defaultValue(RecordFieldType.TIME.getDefaultFormat()) - .addValidator(new SimpleDateFormatValidator()) - .required(true) - .build(); - - static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() - .name("Timestamp Format") - .description("Specifies the format to use when writing out Timestamp (date/time) fields") - .expressionLanguageSupported(false) - .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat()) - .addValidator(new SimpleDateFormatValidator()) - .required(true) - .build(); - - private volatile String dateFormat; - private volatile String timeFormat; - private volatile String timestampFormat; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return Arrays.asList(DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT); - } - - @OnEnabled - public void captureValues(final ConfigurationContext context) { - this.dateFormat = context.getProperty(DATE_FORMAT).getValue(); - this.timeFormat = context.getProperty(TIME_FORMAT).getValue(); - this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).getValue(); - } - - protected String getDateFormat() { - return dateFormat; - } - - protected String getTimeFormat() { - return timeFormat; - } - - protected String getTimestampFormat() { - return timestampFormat; - } -}
