Repository: nifi Updated Branches: refs/heads/master ae940d862 -> 451f9cf12
http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index f05fe30..489d114 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -23,6 +23,7 @@ import java.text.DateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -69,78 +70,74 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { @Override - protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException, MalformedRecordException { - return convertJsonNodeToRecord(jsonNode, schema, null); + protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields) + throws IOException, MalformedRecordException { + return convertJsonNodeToRecord(jsonNode, schema, coerceTypes, dropUnknownFields, null); } - private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix) throws IOException, MalformedRecordException { + private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknown, final String fieldNamePrefix) + throws IOException, MalformedRecordException { if (jsonNode == null) { return null; } - final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); - for (final RecordField field : schema.getFields()) { - final String fieldName = field.getFieldName(); + return convertJsonNodeToRecord(jsonNode, schema, fieldNamePrefix, coerceTypes, dropUnknown); + } - final JsonNode fieldNode = getJsonNode(jsonNode, field); - final DataType desiredType = field.getDataType(); - final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; - final Object value = convertField(fieldNode, fullFieldName, desiredType); - values.put(fieldName, value); - } + private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix, + final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { - final Supplier<String> supplier = () -> jsonNode.toString(); - return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json")); - } + final Map<String, Object> values = new LinkedHashMap<>(); + final Iterator<String> fieldNames = jsonNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + final JsonNode childNode = jsonNode.get(fieldName); - private JsonNode getJsonNode(final JsonNode parent, final RecordField field) { - JsonNode fieldNode = parent.get(field.getFieldName()); - if (fieldNode != null) { - return fieldNode; - } + final RecordField recordField = schema.getField(fieldName).orElse(null); + if (recordField == null && dropUnknown) { + continue; + } - for (final String alias : field.getAliases()) { - fieldNode = parent.get(alias); - if (fieldNode != null) { - return fieldNode; + final Object value; + if (coerceTypes && recordField != null) { + final DataType desiredType = recordField.getDataType(); + final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; + value = convertField(childNode, fullFieldName, desiredType, dropUnknown); + } else { + value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType()); } + + values.put(fieldName, value); } - return fieldNode; + final Supplier<String> supplier = () -> jsonNode.toString(); + return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown); } - protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType) throws IOException, MalformedRecordException { + + protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException { if (fieldNode == null || fieldNode.isNull()) { return null; } switch (desiredType.getFieldType()) { case BOOLEAN: - return DataTypeUtils.toBoolean(getRawNodeValue(fieldNode), fieldName); case BYTE: - return DataTypeUtils.toByte(getRawNodeValue(fieldNode), fieldName); case CHAR: - return DataTypeUtils.toCharacter(getRawNodeValue(fieldNode), fieldName); case DOUBLE: - return DataTypeUtils.toDouble(getRawNodeValue(fieldNode), fieldName); case FLOAT: - return DataTypeUtils.toFloat(getRawNodeValue(fieldNode), fieldName); case INT: - return DataTypeUtils.toInteger(getRawNodeValue(fieldNode), fieldName); case LONG: - return DataTypeUtils.toLong(getRawNodeValue(fieldNode), fieldName); case SHORT: - return DataTypeUtils.toShort(getRawNodeValue(fieldNode), fieldName); case STRING: - return DataTypeUtils.toString(getRawNodeValue(fieldNode), - () -> DataTypeUtils.getDateFormat(desiredType.getFieldType(), LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT)); case DATE: - return DataTypeUtils.toDate(getRawNodeValue(fieldNode), LAZY_DATE_FORMAT, fieldName); case TIME: - return DataTypeUtils.toTime(getRawNodeValue(fieldNode), LAZY_TIME_FORMAT, fieldName); - case TIMESTAMP: - return DataTypeUtils.toTimestamp(getRawNodeValue(fieldNode), LAZY_TIMESTAMP_FORMAT, fieldName); + case TIMESTAMP: { + final Object rawValue = getRawNodeValue(fieldNode); + final Object converted = DataTypeUtils.convertType(rawValue, desiredType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + return converted; + } case MAP: { final DataType valueType = ((MapDataType) desiredType).getValueType(); @@ -149,7 +146,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { while (fieldNameItr.hasNext()) { final String childName = fieldNameItr.next(); final JsonNode childNode = fieldNode.get(childName); - final Object childValue = convertField(childNode, fieldName, valueType); + final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown); map.put(childName, childValue); } @@ -162,7 +159,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { int count = 0; for (final JsonNode node : arrayNode) { final DataType elementType = ((ArrayDataType) desiredType).getElementType(); - final Object converted = convertField(node, fieldName, elementType); + final Object converted = convertField(node, fieldName, elementType, dropUnknown); arrayElements[count++] = converted; } @@ -187,7 +184,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { childSchema = new SimpleRecordSchema(fields); } - return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + "."); + return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown); } else { return null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index 96b1995..8f3bf22 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -29,7 +29,9 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RawRecordWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; @@ -44,7 +46,7 @@ import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; -public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter { +public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter { private final ComponentLog logger; private final SchemaAccessWriter schemaAccess; private final RecordSchema recordSchema; @@ -107,6 +109,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe } } + @Override public Map<String, String> writeRecord(final Record record) throws IOException { // If we are not writing an active record set, then we need to ensure that we write the @@ -116,12 +119,25 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe schemaAccess.writeHeader(recordSchema, getOutputStream()); } - writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); + writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true); return schemaAccess.getAttributes(recordSchema); } - private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask) - throws JsonGenerationException, IOException { + @Override + public WriteResult writeRawRecord(final Record record) throws IOException { + // If we are not writing an active record set, then we need to ensure that we write the + // schema information. + if (!isActiveRecordSet()) { + generator.flush(); + schemaAccess.writeHeader(recordSchema, getOutputStream()); + } + + writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false); + return WriteResult.of(incrementRecordCount(), schemaAccess.getAttributes(recordSchema)); + } + + private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, + final GeneratorTask startTask, final GeneratorTask endTask, final boolean schemaAware) throws JsonGenerationException, IOException { final Optional<SerializedForm> serializedForm = record.getSerializedForm(); if (serializedForm.isPresent()) { @@ -137,21 +153,36 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe try { startTask.apply(generator); - for (int i = 0; i < writeSchema.getFieldCount(); i++) { - final RecordField field = writeSchema.getField(i); - final String fieldName = field.getFieldName(); - final Object value = record.getValue(field); - if (value == null) { - generator.writeNullField(fieldName); - continue; - } - generator.writeFieldName(fieldName); - final DataType dataType = writeSchema.getDataType(fieldName).get(); + if (schemaAware) { + for (final RecordField field : writeSchema.getFields()) { + final String fieldName = field.getFieldName(); + final Object value = record.getValue(field); + if (value == null) { + generator.writeNullField(fieldName); + continue; + } + + generator.writeFieldName(fieldName); - writeValue(generator, value, fieldName, dataType, i < writeSchema.getFieldCount() - 1); + final DataType dataType = writeSchema.getDataType(fieldName).get(); + writeValue(generator, value, fieldName, dataType); + } + } else { + for (final String fieldName : record.getRawFieldNames()) { + final Object value = record.getValue(fieldName); + if (value == null) { + generator.writeNullField(fieldName); + continue; + } + + generator.writeFieldName(fieldName); + writeRawValue(generator, value, fieldName); + } } + + endTask.apply(generator); } catch (final Exception e) { logger.error("Failed to write {} with schema {} as a JSON Object due to {}", new Object[] {record, record.getSchema(), e.toString(), e}); @@ -159,9 +190,51 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe } } + @SuppressWarnings("unchecked") + private void writeRawValue(final JsonGenerator generator, final Object value, final String fieldName) + throws JsonGenerationException, IOException { + + if (value == null) { + generator.writeNull(); + return; + } + + if (value instanceof Record) { + final Record record = (Record) value; + writeRecord(record, record.getSchema(), generator, gen -> gen.writeStartObject(), gen -> gen.writeEndObject(), false); + return; + } + + if (value instanceof Map) { + final Map<String, ?> map = (Map<String, ?>) value; + generator.writeStartObject(); + + for (final Map.Entry<String, ?> entry : map.entrySet()) { + final String mapKey = entry.getKey(); + final Object mapValue = entry.getValue(); + generator.writeFieldName(mapKey); + writeRawValue(generator, mapValue, fieldName + "." + mapKey); + } + + generator.writeEndObject(); + return; + } + + if (value instanceof Object[]) { + final Object[] values = (Object[]) value; + generator.writeStartArray(); + for (final Object element : values) { + writeRawValue(generator, element, fieldName); + } + generator.writeEndArray(); + return; + } + + generator.writeObject(value); + } @SuppressWarnings("unchecked") - private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType, final boolean moreCols) + private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws JsonGenerationException, IOException { if (value == null) { generator.writeNull(); @@ -242,7 +315,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe final Record record = (Record) coercedValue; final RecordDataType recordDataType = (RecordDataType) chosenDataType; final RecordSchema childSchema = recordDataType.getChildSchema(); - writeRecord(record, childSchema, generator, gen -> gen.writeStartObject(), gen -> gen.writeEndObject()); + writeRecord(record, childSchema, generator, gen -> gen.writeStartObject(), gen -> gen.writeEndObject(), true); break; } case MAP: { @@ -250,12 +323,12 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe final DataType valueDataType = mapDataType.getValueType(); final Map<String, ?> map = (Map<String, ?>) coercedValue; generator.writeStartObject(); - int i = 0; + for (final Map.Entry<String, ?> entry : map.entrySet()) { final String mapKey = entry.getKey(); final Object mapValue = entry.getValue(); generator.writeFieldName(mapKey); - writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType, ++i < map.size()); + writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType); } generator.writeEndObject(); break; @@ -278,9 +351,8 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe throws JsonGenerationException, IOException { generator.writeStartArray(); for (int i = 0; i < values.length; i++) { - final boolean moreEntries = i < values.length - 1; final Object element = values[i]; - writeValue(generator, element, fieldName, elementType, moreEntries); + writeValue(generator, element, fieldName, elementType); } generator.writeEndArray(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html index 5a08269..8e18934 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html @@ -73,42 +73,44 @@ <h2>Examples</h2> + <h3>Example 1</h3> + <p> As an example, consider a FlowFile whose contents consists of the following: </p> - <code> - id, name, balance, join_date, notes<br /> - 1, John, 48.23, 04/03/2007 "Our very<br /> +<code> +id, name, balance, join_date, notes<br /> +1, John, 48.23, 04/03/2007 "Our very<br /> first customer!"<br /> - 2, Jane, 1245.89, 08/22/2009,<br /> - 3, Frank Franklin, "48481.29", 04/04/2016,<br /> - </code> +2, Jane, 1245.89, 08/22/2009,<br /> +3, Frank Franklin, "48481.29", 04/04/2016,<br /> +</code> <p> Additionally, let's consider that this Controller Service is configured with the Schema Registry pointing to an AvroSchemaRegistry and the schema is configured as the following: </p> - <code> - <pre> - { - "namespace": "nifi", - "name": "balances", - "type": "record", - "fields": [ - { "name": "id", "type": "int" }, - { "name": "name": "type": "string" }, - { "name": "balance": "type": "double" }, - { "name": "join_date", "type": { - "type": "int", - "logicalType": "date" - }, - { "name": "notes": "type": "string" } - ] - } - </pre> - </code> +<code> +<pre> +{ + "namespace": "nifi", + "name": "balances", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name": "type": "string" }, + { "name": "balance": "type": "double" }, + { "name": "join_date", "type": { + "type": "int", + "logicalType": "date" + }}, + { "name": "notes": "type": "string" } + ] +} +</pre> +</code> <p> In the example above, we see that the 'join_date' column is a Date type. In order for the CSV Reader to be able to properly parse a value as a date, @@ -211,7 +213,122 @@ first customer!"<br /> </tr> </body> </table> + + + + <h3>Example 2 - Schema with CSV Header Line</h3> + + <p> + When CSV data consists of a header line that outlines the column names, the reader provides + a couple of different properties for configuring how to handle these column names. The + "Schema Access Strategy" property as well as the associated properties ("Schema Registry," "Schema Text," and + "Schema Name" properties) can be used to specify how to obtain the schema. If the "Schema Access Strategy" is set + to "Use String Fields From Header" then the header line of the CSV will be used to determine the schema. Otherwise, + a schema will be referenced elsewhere. But what happens if a schema is obtained from a Schema Registry, for instance, + and the CSV Header indicates a different set of column names? + </p> + <p> + For example, let's say that the following schema is obtained from the Schema Registry: + </p> + +<code> +<pre> +{ + "namespace": "nifi", + "name": "balances", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name": "type": "string" }, + { "name": "balance": "type": "double" }, + { "name": "memo": "type": "string" } + ] +} +</pre> +</code> + + <p> + And the CSV contains the following data: + </p> + +<code> +<pre> +id, name, balance, notes +1, John Doe, 123.45, First Customer +</pre> +</code> + + <p> + Note here that our schema indicates that the final column is named "memo" whereas the CSV Header indicates that it is named "notes." + </p> + <p> + In this case, the reader will look at the "Ignore CSV Header Column Names" property. If this property is set to "true" then the column names + provided in the CSV will simply be ignored and the last column will be called "memo." However, if the "Ignore CSV Header Column Names" property + is set to "false" then the result will be that the last column will be named "notes" and each record will have a null value for the "memo" column. + </p> + + <p> + With "Ignore CSV Header Column Names" property set to "false":<br /> + <table> + <head> + <th>Field Name</th> + <th>Field Value</th> + </head> + <body> + <tr> + <td>id</td> + <td>1</td> + </tr> + <tr> + <td>name</td> + <td>John Doe</td> + </tr> + <tr> + <td>balance</td> + <td>123.45</td> + </tr> + <tr> + <td>memo</td> + <td>First Customer</td> + </tr> + </body> + </table> + </p> + + + <p> + With "Ignore CSV Header Column Names" property set to "true":<br /> + <table> + <head> + <th>Field Name</th> + <th>Field Value</th> + </head> + <body> + <tr> + <td>id</td> + <td>1</td> + </tr> + <tr> + <td>name</td> + <td>John Doe</td> + </tr> + <tr> + <td>balance</td> + <td>123.45</td> + </tr> + <tr> + <td>notes</td> + <td>First Customer</td> + </tr> + <tr> + <td>memo</td> + <td><code>null</code></td> + </tr> + </body> + </table> + </p> + </body> </html> http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java index 5c04cfa..b5fd869 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java @@ -253,8 +253,8 @@ public class TestAvroReaderWithEmbeddedSchema { accountValues.put("accountId", 83L); final List<RecordField> accountRecordFields = new ArrayList<>(); - accountRecordFields.add(new RecordField("accountId", RecordFieldType.LONG.getDataType())); - accountRecordFields.add(new RecordField("accountName", RecordFieldType.STRING.getDataType())); + accountRecordFields.add(new RecordField("accountId", RecordFieldType.LONG.getDataType(), false)); + accountRecordFields.add(new RecordField("accountName", RecordFieldType.STRING.getDataType(), false)); final RecordSchema accountRecordSchema = new SimpleRecordSchema(accountRecordFields); final Record mapRecord = new MapRecord(accountRecordSchema, accountValues); @@ -269,8 +269,8 @@ public class TestAvroReaderWithEmbeddedSchema { dogMap.put("dogTailLength", 14); final List<RecordField> dogRecordFields = new ArrayList<>(); - dogRecordFields.add(new RecordField("dogTailLength", RecordFieldType.INT.getDataType())); - dogRecordFields.add(new RecordField("dogName", RecordFieldType.STRING.getDataType())); + dogRecordFields.add(new RecordField("dogTailLength", RecordFieldType.INT.getDataType(), false)); + dogRecordFields.add(new RecordField("dogName", RecordFieldType.STRING.getDataType(), false)); final RecordSchema dogRecordSchema = new SimpleRecordSchema(dogRecordFields); final Record dogRecord = new MapRecord(dogRecordSchema, dogMap); @@ -281,8 +281,8 @@ public class TestAvroReaderWithEmbeddedSchema { catMap.put("catTailLength", 1); final List<RecordField> catRecordFields = new ArrayList<>(); - catRecordFields.add(new RecordField("catTailLength", RecordFieldType.INT.getDataType())); - catRecordFields.add(new RecordField("catName", RecordFieldType.STRING.getDataType())); + catRecordFields.add(new RecordField("catTailLength", RecordFieldType.INT.getDataType(), false)); + catRecordFields.add(new RecordField("catName", RecordFieldType.STRING.getDataType(), false)); final RecordSchema catRecordSchema = new SimpleRecordSchema(catRecordFields); final Record catRecord = new MapRecord(catRecordSchema, catMap); http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java index a02e0b1..3533a43 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java @@ -57,6 +57,11 @@ public class TestCSVRecordReader { return fields; } + private CSVRecordReader createReader(final InputStream in, final RecordSchema schema) throws IOException { + return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + } + @Test public void testDate() throws IOException, MalformedRecordException { final String text = "date\n11/30/1983"; @@ -66,7 +71,7 @@ public class TestCSVRecordReader { final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); - final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { final Record record = reader.nextRecord(); @@ -87,9 +92,8 @@ public class TestCSVRecordReader { final RecordSchema schema = new SimpleRecordSchema(fields); - try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"))) { - final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv")); + final CSVRecordReader reader = createReader(fis, schema)) { final Object[] record = reader.nextRecord().getValues(); final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; @@ -106,9 +110,8 @@ public class TestCSVRecordReader { final RecordSchema schema = new SimpleRecordSchema(fields); - try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"))) { - final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv")); + final CSVRecordReader reader = createReader(fis, schema)) { final Object[] firstRecord = reader.nextRecord().getValues(); final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; @@ -129,9 +132,8 @@ public class TestCSVRecordReader { final RecordSchema schema = new SimpleRecordSchema(fields); - try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"))) { - final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv")); + final CSVRecordReader reader = createReader(fis, schema)) { final Object[] firstRecord = reader.nextRecord().getValues(); final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; @@ -157,9 +159,8 @@ public class TestCSVRecordReader { final String csvData = headerLine + "\n" + inputRecord; final byte[] inputData = csvData.getBytes(); - try (final InputStream baos = new ByteArrayInputStream(inputData)) { - final CSVRecordReader reader = new CSVRecordReader(baos, Mockito.mock(ComponentLog.class), schema, format, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + try (final InputStream bais = new ByteArrayInputStream(inputData); + final CSVRecordReader reader = createReader(bais, schema)) { final Record record = reader.nextRecord(); assertNotNull(record); @@ -176,4 +177,149 @@ public class TestCSVRecordReader { assertNull(reader.nextRecord()); } } + + @Test + public void testReadRawWithDifferentFieldName() throws IOException, MalformedRecordException { + final List<RecordField> fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, continent"; + final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, North America"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + // test nextRecord does not contain a 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final CSVRecordReader reader = createReader(bais, schema)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + assertNull(record.getValue("continent")); + + assertNull(reader.nextRecord()); + } + + // test nextRawRecord does contain 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final CSVRecordReader reader = createReader(bais, schema)) { + + final Record record = reader.nextRecord(false, false); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + assertEquals("North America", record.getValue("continent")); + + assertNull(reader.nextRecord(false, false)); + } + } + + + @Test + public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordException { + final List<RecordField> fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode"; + final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + try (final InputStream bais = new ByteArrayInputStream(inputData); + final CSVRecordReader reader = createReader(bais, schema)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + + // If schema says that there are fields a, b, c + // and the CSV has a header line that says field names are a, b + // and then the data has values 1,2,3 + // then a=1, b=2, c=null + assertNull(record.getValue("country")); + + assertNull(reader.nextRecord()); + } + + // Create another Record Reader that indicates that the header line is present but should be ignored. This should cause + // our schema to be the definitive list of what fields exist. + try (final InputStream bais = new ByteArrayInputStream(inputData); + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + + // If schema says that there are fields a, b, c + // and the CSV has a header line that says field names are a, b + // and then the data has values 1,2,3 + // then a=1, b=2, c=null + // But if we configure the reader to Ignore the header, then this will not occur! + assertEquals("USA", record.getValue("country")); + + assertNull(reader.nextRecord()); + } + + } + + @Test + public void testExtraFieldNotInHeader() throws IOException, MalformedRecordException { + final List<RecordField> fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, country"; + final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA, North America"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + // test nextRecord does not contain a 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final CSVRecordReader reader = createReader(bais, schema)) { + + final Record record = reader.nextRecord(false, false); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertEquals("USA", record.getValue("country")); + assertEquals("North America", record.getValue("unknown_field_index_8")); + + assertNull(reader.nextRecord(false, false)); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java index c447664..0285796 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java @@ -30,6 +30,7 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -127,6 +128,172 @@ public class TestWriteCSVResult { assertEquals(expectedValues, values); } + @Test + public void testExtraFieldInWriteRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new HashMap<>(); + values.put("id", "1"); + values.put("name", "John"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + + writer.beginRecordSet(); + writer.write(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id\n1\n", output); + } + + @Test + public void testExtraFieldInWriteRawRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + values.put("name", "John"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n1,John\n", output); + } + + @Test + public void testMissingFieldWriteRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + + writer.beginRecordSet(); + writer.writeRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n1,\n", output); + } + + @Test + public void testMissingFieldWriteRawRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n1,\n", output); + } + + + @Test + public void testMissingAndExtraFieldWriteRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + values.put("dob", "1/1/1970"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + + writer.beginRecordSet(); + writer.writeRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n1,\n", output); + } + + @Test + public void testMissingAndExtraFieldWriteRawRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + values.put("dob", "1/1/1970"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,dob,name\n1,1/1/1970,\n", output); + } + + private DateFormat getDateFormat(final String format) { final DateFormat df = new SimpleDateFormat(format); df.setTimeZone(TimeZone.getTimeZone("gmt")); http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java index 4317728..b849c0a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java @@ -48,7 +48,7 @@ public class TestGrokRecordReader { grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"); - final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), true); + final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"}; final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"}; @@ -78,7 +78,7 @@ public class TestGrokRecordReader { final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.LoggerClass \n" + "org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"; final InputStream bais = new ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8)); - final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, GrokReader.createRecordSchema(grok), true); + final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); final Object[] values = deserializer.nextRecord().getValues(); @@ -101,7 +101,7 @@ public class TestGrokRecordReader { grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}"); - final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), true); + final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); final String[] logLevels = new String[] {"INFO", "INFO", "INFO", "WARN", "WARN"}; @@ -125,7 +125,7 @@ public class TestGrokRecordReader { grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?"); - final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), true); + final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", "WARN", "WARN"}; @@ -157,7 +157,7 @@ public class TestGrokRecordReader { grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"); - final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), true); + final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true); final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"}; final String[] messages = new String[] {"message without stack trace", @@ -211,7 +211,7 @@ public class TestGrokRecordReader { assertTrue(fieldNames.contains("message")); assertTrue(fieldNames.contains("stackTrace")); // always implicitly there - final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, true); + final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, true); final Record record = deserializer.nextRecord(); assertEquals("May 22 15:58:23", record.getValue("timestamp")); @@ -248,7 +248,7 @@ public class TestGrokRecordReader { assertTrue(fieldNames.contains("fourth")); assertTrue(fieldNames.contains("fifth")); - final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, false); + final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, false); final Record record = deserializer.nextRecord(); assertEquals("1", record.getValue("first")); @@ -283,7 +283,7 @@ public class TestGrokRecordReader { assertTrue(fieldNames.contains("fourth")); assertTrue(fieldNames.contains("fifth")); - final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, false); + final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, false); for (int i = 0; i < 2; i++) { final Record record = deserializer.nextRecord(); http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index c83d0dc..e898edd 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -162,6 +162,74 @@ public class TestJsonTreeRowRecordReader { } @Test + public void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final Record schemaValidatedRecord = reader.nextRecord(); + assertEquals(1, schemaValidatedRecord.getValue("id")); + assertEquals("John Doe", schemaValidatedRecord.getValue("name")); + assertNull(schemaValidatedRecord.getValue("balance")); + } + + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final Record rawRecord = reader.nextRecord(false, false); + assertEquals(1, rawRecord.getValue("id")); + assertEquals("John Doe", rawRecord.getValue("name")); + assertEquals(4750.89, rawRecord.getValue("balance")); + assertEquals("123 My Street", rawRecord.getValue("address")); + assertEquals("My City", rawRecord.getValue("city")); + assertEquals("MS", rawRecord.getValue("state")); + assertEquals("11111", rawRecord.getValue("zipCode")); + assertEquals("USA", rawRecord.getValue("country")); + } + } + + + @Test + public void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final Record schemaValidatedRecord = reader.nextRecord(); + assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema + assertEquals("John Doe", schemaValidatedRecord.getValue("name")); + assertNull(schemaValidatedRecord.getValue("balance")); + + assertEquals(2, schemaValidatedRecord.getRawFieldNames().size()); + } + + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final Record rawRecord = reader.nextRecord(false, false); + assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1 + assertEquals("John Doe", rawRecord.getValue("name")); + assertEquals(4750.89, rawRecord.getValue("balance")); + assertEquals("123 My Street", rawRecord.getValue("address")); + assertEquals("My City", rawRecord.getValue("city")); + assertEquals("MS", rawRecord.getValue("state")); + assertEquals("11111", rawRecord.getValue("zipCode")); + assertEquals("USA", rawRecord.getValue("country")); + + assertEquals(8, rawRecord.getRawFieldNames().size()); + } + } + + + @Test public void testSingleJsonElement() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java index 16d2012..2dbf146 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java @@ -183,4 +183,162 @@ public class TestWriteJsonResult { final String output = new String(data, StandardCharsets.UTF_8); assertEquals(expected, output); } + + @Test + public void testExtraFieldInWriteRecord() throws IOException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new HashMap<>(); + values.put("id", "1"); + values.put("name", "John"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) { + writer.beginRecordSet(); + writer.writeRecord(record); + writer.finishRecordSet(); + } + + final byte[] data = baos.toByteArray(); + + final String expected = "[{\"id\":\"1\"}]"; + + final String output = new String(data, StandardCharsets.UTF_8); + assertEquals(expected, output); + } + + @Test + public void testExtraFieldInWriteRawRecord() throws IOException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + values.put("name", "John"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) { + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + } + + final byte[] data = baos.toByteArray(); + + final String expected = "[{\"id\":\"1\",\"name\":\"John\"}]"; + + final String output = new String(data, StandardCharsets.UTF_8); + assertEquals(expected, output); + } + + @Test + public void testMissingFieldInWriteRecord() throws IOException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) { + writer.beginRecordSet(); + writer.writeRecord(record); + writer.finishRecordSet(); + } + + final byte[] data = baos.toByteArray(); + + final String expected = "[{\"id\":\"1\",\"name\":null}]"; + + final String output = new String(data, StandardCharsets.UTF_8); + assertEquals(expected, output); + } + + @Test + public void testMissingFieldInWriteRawRecord() throws IOException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) { + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + } + + final byte[] data = baos.toByteArray(); + + final String expected = "[{\"id\":\"1\"}]"; + + final String output = new String(data, StandardCharsets.UTF_8); + assertEquals(expected, output); + } + + @Test + public void testMissingAndExtraFieldInWriteRecord() throws IOException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + values.put("dob", "1/1/1970"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) { + writer.beginRecordSet(); + writer.writeRecord(record); + writer.finishRecordSet(); + } + + final byte[] data = baos.toByteArray(); + + final String expected = "[{\"id\":\"1\",\"name\":null}]"; + + final String output = new String(data, StandardCharsets.UTF_8); + assertEquals(expected, output); + } + + @Test + public void testMissingAndExtraFieldInWriteRawRecord() throws IOException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + values.put("dob", "1/1/1970"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) { + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + } + + final byte[] data = baos.toByteArray(); + + final String expected = "[{\"id\":\"1\",\"dob\":\"1/1/1970\"}]"; + + final String output = new String(data, StandardCharsets.UTF_8); + assertEquals(expected, output); + } }
