exceptionfactory commented on code in PR #7665:
URL: https://github.com/apache/nifi/pull/7665#discussion_r1329269945
##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -31,29 +29,23 @@
public class JsonRecordSource implements RecordSource<JsonNode> {
private static final Logger logger =
LoggerFactory.getLogger(JsonRecordSource.class);
- private static final JsonFactory jsonFactory;
private final JsonParser jsonParser;
private final StartingFieldStrategy strategy;
- private final String startingFieldName;
-
- static {
- jsonFactory = new JsonFactory();
- jsonFactory.setCodec(new ObjectMapper());
- }
public JsonRecordSource(final InputStream in) throws IOException {
- jsonParser = jsonFactory.createParser(in);
- strategy = null;
- startingFieldName = null;
+ this(in, null, null);
}
public JsonRecordSource(final InputStream in, final StartingFieldStrategy
strategy, final String startingFieldName) throws IOException {
- jsonParser = jsonFactory.createParser(in);
+ this(in , strategy, startingFieldName, new JsonParserFactory());
+ }
+
+ public JsonRecordSource(final InputStream in, final StartingFieldStrategy
strategy, final String startingFieldName, TokenParserFactory
tokenParserFactory) throws IOException {
+ jsonParser = tokenParserFactory.getJsonParser(in);
this.strategy = strategy;
- this.startingFieldName = startingFieldName;
- if (strategy == StartingFieldStrategy.NESTED_FIELD) {
- final SerializedString serializedNestedField = new
SerializedString(this.startingFieldName);
+ if (StartingFieldStrategy.NESTED_FIELD.equals(strategy)) {
Review Comment:
The comparison of `enum` values should use `==` instead of `equals()`,
although both will work.
```suggestion
if (StartingFieldStrategy.NESTED_FIELD == strategy) {
```
##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java:
##########
@@ -0,0 +1,1362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.yaml;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.json.JsonSchemaInference;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.util.EqualsWrapper;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class TestYamlTreeRowRecordReader {
+ private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+ private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+ private final String timestampFormat =
RecordFieldType.TIMESTAMP.getDefaultFormat();
+
+ private List<RecordField> getDefaultFields() {
+ return getFields(RecordFieldType.DOUBLE.getDataType());
+ }
+
+ private List<RecordField> getFields(final DataType balanceDataType) {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("balance", balanceDataType));
+ fields.add(new RecordField("address",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("city",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("state",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("zipCode",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("country",
RecordFieldType.STRING.getDataType()));
+ return fields;
+ }
+
+ private RecordSchema getAccountSchema() {
+ final List<RecordField> accountFields = new ArrayList<>();
+ accountFields.add(new RecordField("id",
RecordFieldType.INT.getDataType()));
+ accountFields.add(new RecordField("balance",
RecordFieldType.DOUBLE.getDataType()));
+
+ return new SimpleRecordSchema(accountFields);
+ }
+
+ @Test
+ void testReadChoiceOfStringOrArrayOfRecords() throws IOException,
MalformedRecordException {
+ final File schemaFile = new
File("src/test/resources/json/choice-of-string-or-array-record.avsc");
+ final File jsonFile = new
File("src/test/resources/yaml/choice-of-string-or-array-record.yaml");
+
+ final Schema avroSchema = new Schema.Parser().parse(schemaFile);
+ final RecordSchema recordSchema =
AvroTypeUtil.createSchema(avroSchema);
+
+ try (final InputStream fis = new FileInputStream(jsonFile);
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema,
dateFormat, timeFormat, timestampFormat)) {
+
+ final Record record = reader.nextRecord();
+ final Object[] fieldsArray = record.getAsArray("fields");
+ assertEquals(2, fieldsArray.length);
+
+ final Object firstElement = fieldsArray[0];
+ assertTrue(firstElement instanceof Record);
+ assertEquals("string", ((Record)
firstElement).getAsString("type"));
+
+ final Object secondElement = fieldsArray[1];
+ assertTrue(secondElement instanceof Record);
+ final Object[] typeArray = ((Record)
secondElement).getAsArray("type");
+ assertEquals(1, typeArray.length);
+
+ final Object firstType = typeArray[0];
+ assertTrue(firstType instanceof Record);
+ final Record firstTypeRecord = (Record) firstType;
+ assertEquals("string", firstTypeRecord.getAsString("type"));
+ }
+ }
+
+ @Test
+ void testChoiceOfRecordTypes() throws IOException,
MalformedRecordException {
+ final Schema avroSchema = new Schema.Parser().parse(new
File("src/test/resources/json/record-choice.avsc"));
+ final RecordSchema recordSchema =
AvroTypeUtil.createSchema(avroSchema);
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/elements-for-record-choice.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat,
timeFormat, timestampFormat)) {
+
+ // evaluate first record
+ final Record firstRecord = reader.nextRecord();
+ assertNotNull(firstRecord);
+ final RecordSchema firstOuterSchema = firstRecord.getSchema();
+ assertEquals(Arrays.asList("id", "child"),
firstOuterSchema.getFieldNames());
+ assertEquals("1234", firstRecord.getValue("id"));
+
+ // record should have a schema that indicates that the 'child' is
a CHOICE of 2 different record types
+ assertSame(RecordFieldType.CHOICE,
firstOuterSchema.getDataType("child").get().getFieldType());
+ final List<DataType> firstSubTypes = ((ChoiceDataType)
firstOuterSchema.getDataType("child").get()).getPossibleSubTypes();
+ assertEquals(2, firstSubTypes.size());
+ assertEquals(2L, firstSubTypes.stream().filter(type ->
type.getFieldType() == RecordFieldType.RECORD).count());
+
+ // child record should have a schema with "id" as the only field
+ final Object childObject = firstRecord.getValue("child");
+ assertTrue(childObject instanceof Record);
+ final Record firstChildRecord = (Record) childObject;
+ final RecordSchema firstChildSchema = firstChildRecord.getSchema();
+
+ assertEquals(Collections.singletonList("id"),
firstChildSchema.getFieldNames());
+
+ // evaluate second record
+ final Record secondRecord = reader.nextRecord();
+ assertNotNull(secondRecord);
+
+ final RecordSchema secondOuterSchema = secondRecord.getSchema();
+ assertEquals(Arrays.asList("id", "child"),
secondOuterSchema.getFieldNames());
+ assertEquals("1234", secondRecord.getValue("id"));
+
+ // record should have a schema that indicates that the 'child' is
a CHOICE of 2 different record types
+ assertSame(RecordFieldType.CHOICE,
secondOuterSchema.getDataType("child").get().getFieldType());
+ final List<DataType> secondSubTypes = ((ChoiceDataType)
secondOuterSchema.getDataType("child").get()).getPossibleSubTypes();
+ assertEquals(2, secondSubTypes.size());
+ assertEquals(2L, secondSubTypes.stream().filter(type ->
type.getFieldType() == RecordFieldType.RECORD).count());
+
+ // child record should have a schema with "name" as the only field
+ final Object secondChildObject = secondRecord.getValue("child");
+ assertTrue(secondChildObject instanceof Record);
+ final Record secondChildRecord = (Record) secondChildObject;
+ final RecordSchema secondChildSchema =
secondChildRecord.getSchema();
+
+ assertEquals(Collections.singletonList("name"),
secondChildSchema.getFieldNames());
+
+ assertNull(reader.nextRecord());
+ }
+
+ }
+
+ @Test
+ void testReadArray() throws IOException, MalformedRecordException {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+ final Object[] secondRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ @Disabled("Not sure there is such a thing as one Yaml doc per line")
+ void testReadOneLinePerJSON() throws IOException, MalformedRecordException
{
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-oneline.json");
+ final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+ final Object[] secondRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ @Disabled("TODO Determine whether this is possible in Yaml")
+ void testReadMultilineJSON() throws IOException, MalformedRecordException {
+ final List<RecordField> fields =
getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-multiline.json");
+ final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DECIMAL, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe",
BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"},
firstRecordValues);
+
+ final Object[] secondRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {2, "Jane Doe",
BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333",
"USA"}, secondRecordValues);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ @Disabled("TODO Determine whether this is possible in Yaml as two arrays
may end up as one array")
+ void testReadMultilineArrays() throws IOException,
MalformedRecordException {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-multiarray.json");
+ final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+ final Object[] secondRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+ final Object[] thirdRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My
Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
+
+ final Object[] fourthRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your
Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ @Disabled("Not sure this makes sense in Yaml")
+ void testReadMixedJSON() throws IOException, MalformedRecordException {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/bank-account-mixed.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+ final Object[] secondRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+ //final Object[] thirdRecordValues =
reader.nextRecord().getValues();
+ //assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My
Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
+
+ final Object[] fourthRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your
Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
+
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ void testReadRawRecordIncludesFieldsNotInSchema() throws IOException,
MalformedRecordException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final Record schemaValidatedRecord = reader.nextRecord(true, true);
+ assertEquals(1, schemaValidatedRecord.getValue("id"));
+ assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
+ assertNull(schemaValidatedRecord.getValue("balance"));
+ }
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final Record rawRecord = reader.nextRecord(false, false);
+ assertEquals(1, rawRecord.getValue("id"));
+ assertEquals("John Doe", rawRecord.getValue("name"));
+ assertEquals(4750.89, rawRecord.getValue("balance"));
+ assertEquals("123 My Street", rawRecord.getValue("address"));
+ assertEquals("My City", rawRecord.getValue("city"));
+ assertEquals("MS", rawRecord.getValue("state"));
+ assertEquals("11111", rawRecord.getValue("zipCode"));
+ assertEquals("USA", rawRecord.getValue("country"));
+ }
+ }
+
+ @Test
+ void testReadRawRecordFieldOrderPreserved() throws IOException,
MalformedRecordException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final String expectedMap = "{id=1, name=John Doe, address=123 My
Street, city=My City, state=MS, zipCode=11111, country=USA,
account=MapRecord[{id=42, balance=4750.89}]}";
+ final String expectedRecord = String.format("MapRecord[%s]",
expectedMap);
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/single-element-nested.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final Record rawRecord = reader.nextRecord(false, false);
+
+ assertEquals(expectedRecord, rawRecord.toString());
+
+ final Map<String, Object> map = rawRecord.toMap();
+ assertEquals(expectedMap, map.toString());
+ }
+ }
+
+ @Test
+ void testReadRawRecordTypeCoercion() throws IOException,
MalformedRecordException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final Record schemaValidatedRecord = reader.nextRecord(true, true);
+ assertEquals("1", schemaValidatedRecord.getValue("id")); // will
be coerced into a STRING as per the schema
+ assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
+ assertNull(schemaValidatedRecord.getValue("balance"));
+
+ assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
+ }
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final Record rawRecord = reader.nextRecord(false, false);
+ assertEquals(1, rawRecord.getValue("id")); // will return raw
value of (int) 1
+ assertEquals("John Doe", rawRecord.getValue("name"));
+ assertEquals(4750.89, rawRecord.getValue("balance"));
+ assertEquals("123 My Street", rawRecord.getValue("address"));
+ assertEquals("My City", rawRecord.getValue("city"));
+ assertEquals("MS", rawRecord.getValue("state"));
+ assertEquals("11111", rawRecord.getValue("zipCode"));
+ assertEquals("USA", rawRecord.getValue("country"));
+
+ assertEquals(8, rawRecord.getRawFieldNames().size());
+ }
+ }
+
+ @Test
+ void testDateCoercedFromString() throws IOException,
MalformedRecordException {
+ final String dateField = "date";
+ final List<RecordField> recordFields = Collections.singletonList(new
RecordField(dateField, RecordFieldType.DATE.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+ final String date = "2000-01-01";
+ final String datePattern = "yyyy-MM-dd";
+ final String yaml = String.format("%s: %s", dateField, date);
+ for (final boolean coerceTypes : new boolean[] {true, false}) {
+ try (final InputStream in = new
ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern,
timeFormat, timestampFormat)) {
+
+ final Record record = reader.nextRecord(coerceTypes, false);
+ final Object value = record.getValue(dateField);
+ assertTrue(value instanceof java.sql.Date, "With coerceTypes
set to " + coerceTypes + ", value is not a Date");
+ assertEquals(date, value.toString());
+ }
+ }
+ }
+
+ @Test
+ void testTimestampCoercedFromString() throws IOException,
MalformedRecordException {
+ final List<RecordField> recordFields = Collections.singletonList(new
RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+ for (final boolean coerceTypes : new boolean[] {true, false}) {
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/timestamp.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+
+ final Record record = reader.nextRecord(coerceTypes, false);
+ final Object value = record.getValue("timestamp");
+ assertTrue(value instanceof java.sql.Timestamp, "With
coerceTypes set to " + coerceTypes + ", value is not a Timestamp");
+ }
+ }
+ }
+
+ @Test
+ void testSingleJsonElement() throws IOException, MalformedRecordException {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/single-bank-account.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ void testSingleJsonElementWithChoiceFields() throws IOException,
MalformedRecordException {
+ // Wraps default fields by Choice data type to test mapping to a
Choice type.
+ final List<RecordField> choiceFields = getDefaultFields().stream()
+ .map(f -> new RecordField(f.getFieldName(),
RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
+ final RecordSchema schema = new SimpleRecordSchema(choiceFields);
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/single-bank-account.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
+ final List<RecordField> fields = schema.getFields();
+ for (int i = 0; i < schema.getFields().size(); i++) {
+ assertTrue(fields.get(i).getDataType() instanceof
ChoiceDataType);
+ final ChoiceDataType choiceDataType = (ChoiceDataType)
fields.get(i).getDataType();
+ assertEquals(expectedTypes.get(i),
choiceDataType.getPossibleSubTypes().get(0).getFieldType());
+ }
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ void testElementWithNestedData() throws IOException,
MalformedRecordException {
+ final DataType accountType =
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("account", accountType));
+ fields.remove(new RecordField("balance",
RecordFieldType.DOUBLE.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/single-element-nested.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.RECORD);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ final Object[] allButLast = Arrays.copyOfRange(firstRecordValues,
0, firstRecordValues.length - 1);
+ assertArrayEquals(new Object[] {1, "John Doe", "123 My Street",
"My City", "MS", "11111", "USA"}, allButLast);
+
+ final Object last = firstRecordValues[firstRecordValues.length -
1];
+ assertTrue(Record.class.isAssignableFrom(last.getClass()));
+ final Record record = (Record) last;
+ assertEquals(42, record.getValue("id"));
+ assertEquals(4750.89, record.getValue("balance"));
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ void testElementWithNestedArray() throws IOException,
MalformedRecordException {
+ final DataType accountRecordType =
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+ final DataType accountsType =
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("accounts", accountsType));
+ fields.remove(new RecordField("balance",
RecordFieldType.DOUBLE.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/single-element-nested-array.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "address", "city", "state", "zipCode", "country", "accounts");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.ARRAY);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ final Object[] nonArrayValues =
Arrays.copyOfRange(firstRecordValues, 0, firstRecordValues.length - 1);
+ assertArrayEquals(new Object[] {1, "John Doe", "123 My Street",
"My City", "MS", "11111", "USA"}, nonArrayValues);
+
+ final Object lastRecord =
firstRecordValues[firstRecordValues.length - 1];
+ assertTrue(Object[].class.isAssignableFrom(lastRecord.getClass()));
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ void testReadArrayDifferentSchemas() throws IOException,
MalformedRecordException {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/bank-account-array-different-schemas.yaml");
+ final JsonTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+ final Object[] secondRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your
Street", "Your City", "NY", "33333", null}, secondRecordValues);
+
+ final Object[] thirdRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {3, "Jake Doe", 4751.89, "124 My
Street", "My City", "MS", "11111", "USA"}, thirdRecordValues);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws
IOException, MalformedRecordException {
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/bank-account-array-optional-balance.yaml");
+ final JsonTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final List<String> fieldNames = schema.getFieldNames();
+ final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
+ assertEquals(expectedFieldNames, fieldNames);
+
+ final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
RecordFieldType.DOUBLE, RecordFieldType.STRING,
+ RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING);
+ assertEquals(expectedTypes, dataTypes);
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+ final Object[] secondRecordValues =
reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {2, "Jane Doe", null, "321 Your
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+ final Object[] thirdRecordValues = reader.nextRecord().getValues();
+ assertArrayEquals(new Object[] {3, "Jimmy Doe", null, "321 Your
Street", "Your City", "NY", "33333", "USA"}, thirdRecordValues);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+
+ @Test
+ void testReadUnicodeCharacters() throws IOException,
MalformedRecordException {
+
+ final List<RecordField> fromFields = new ArrayList<>();
+ fromFields.add(new RecordField("id",
RecordFieldType.LONG.getDataType()));
+ fromFields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema fromSchema = new SimpleRecordSchema(fromFields);
+ final DataType fromType =
RecordFieldType.RECORD.getRecordDataType(fromSchema);
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("created_at",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
+ fields.add(new RecordField("unicode",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("from", fromType));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/yaml-with-unicode.yaml");
+ final JsonTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ final Object[] firstRecordValues = reader.nextRecord().getValues();
+
+ final Object secondValue = firstRecordValues[1];
+ assertTrue(secondValue instanceof Long);
+ assertEquals(832036744985577473L, secondValue);
+
+ final Object unicodeValue = firstRecordValues[2];
+ assertEquals("\u3061\u3083\u6ce3\u304d\u305d\u3046", unicodeValue);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ void testIncorrectSchema() {
+ final DataType accountType =
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+ final List<RecordField> fields = getDefaultFields();
+ fields.add(new RecordField("account", accountType));
+ fields.remove(new RecordField("balance",
RecordFieldType.DOUBLE.getDataType()));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ MalformedRecordException mre =
assertThrows(MalformedRecordException.class, () -> {
+ try (final InputStream in = new
FileInputStream("src/test/resources/yaml/single-bank-account-wrong-field-type.yaml");
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+
+ reader.nextRecord().getValues();
+ }
+ });
+
+ final String msg = mre.getCause().getMessage();
+ assertTrue(msg.contains("account.balance"));
+ assertTrue(msg.contains("true"));
+ assertTrue(msg.contains("Double"));
+ assertTrue(msg.contains("Boolean"));
+ }
+
+ @Test
+ void testMergeOfSimilarRecords() throws Exception {
+ // GIVEN
Review Comment:
The `GIVEN` and `WHEN/THEN` comments can be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]