exceptionfactory commented on code in PR #7665:
URL: https://github.com/apache/nifi/pull/7665#discussion_r1329269945


##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -31,29 +29,23 @@
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
     private static final Logger logger = 
LoggerFactory.getLogger(JsonRecordSource.class);
-    private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
     private final StartingFieldStrategy strategy;
-    private final String startingFieldName;
-
-    static {
-        jsonFactory = new JsonFactory();
-        jsonFactory.setCodec(new ObjectMapper());
-    }
 
     public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createParser(in);
-        strategy = null;
-        startingFieldName = null;
+        this(in, null, null);
     }
 
     public JsonRecordSource(final InputStream in, final StartingFieldStrategy 
strategy, final String startingFieldName) throws IOException {
-        jsonParser = jsonFactory.createParser(in);
+        this(in , strategy, startingFieldName, new JsonParserFactory());
+    }
+
+    public JsonRecordSource(final InputStream in, final StartingFieldStrategy 
strategy, final String startingFieldName, TokenParserFactory 
tokenParserFactory) throws IOException {
+        jsonParser = tokenParserFactory.getJsonParser(in);
         this.strategy = strategy;
-        this.startingFieldName = startingFieldName;
 
-        if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-            final SerializedString serializedNestedField = new 
SerializedString(this.startingFieldName);
+        if (StartingFieldStrategy.NESTED_FIELD.equals(strategy)) {

Review Comment:
   The comparison of `enum` values should use `==` instead of `equals()`, 
although both will work.
   ```suggestion
           if (StartingFieldStrategy.NESTED_FIELD == strategy) {
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java:
##########
@@ -0,0 +1,1362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.yaml;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.json.JsonSchemaInference;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.util.EqualsWrapper;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class TestYamlTreeRowRecordReader {
+    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+    private final String timestampFormat = 
RecordFieldType.TIMESTAMP.getDefaultFormat();
+
+    private List<RecordField> getDefaultFields() {
+        return getFields(RecordFieldType.DOUBLE.getDataType());
+    }
+
+    private List<RecordField> getFields(final DataType balanceDataType) {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("balance", balanceDataType));
+        fields.add(new RecordField("address", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("city", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("state", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("zipCode", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("country", 
RecordFieldType.STRING.getDataType()));
+        return fields;
+    }
+
+    private RecordSchema getAccountSchema() {
+        final List<RecordField> accountFields = new ArrayList<>();
+        accountFields.add(new RecordField("id", 
RecordFieldType.INT.getDataType()));
+        accountFields.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+
+        return new SimpleRecordSchema(accountFields);
+    }
+
+    @Test
+    void testReadChoiceOfStringOrArrayOfRecords() throws IOException, 
MalformedRecordException {
+        final File schemaFile = new 
File("src/test/resources/json/choice-of-string-or-array-record.avsc");
+        final File jsonFile = new 
File("src/test/resources/yaml/choice-of-string-or-array-record.yaml");
+
+        final Schema avroSchema = new Schema.Parser().parse(schemaFile);
+        final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema);
+
+        try (final InputStream fis = new FileInputStream(jsonFile);
+            final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final Record record = reader.nextRecord();
+            final Object[] fieldsArray = record.getAsArray("fields");
+            assertEquals(2, fieldsArray.length);
+
+            final Object firstElement = fieldsArray[0];
+            assertTrue(firstElement instanceof Record);
+            assertEquals("string", ((Record) 
firstElement).getAsString("type"));
+
+            final Object secondElement = fieldsArray[1];
+            assertTrue(secondElement instanceof Record);
+            final Object[] typeArray = ((Record) 
secondElement).getAsArray("type");
+            assertEquals(1, typeArray.length);
+
+            final Object firstType = typeArray[0];
+            assertTrue(firstType instanceof Record);
+            final Record firstTypeRecord = (Record) firstType;
+            assertEquals("string", firstTypeRecord.getAsString("type"));
+        }
+    }
+
+    @Test
+    void testChoiceOfRecordTypes() throws IOException, 
MalformedRecordException {
+        final Schema avroSchema = new Schema.Parser().parse(new 
File("src/test/resources/json/record-choice.avsc"));
+        final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema);
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/elements-for-record-choice.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            // evaluate first record
+            final Record firstRecord = reader.nextRecord();
+            assertNotNull(firstRecord);
+            final RecordSchema firstOuterSchema = firstRecord.getSchema();
+            assertEquals(Arrays.asList("id", "child"), 
firstOuterSchema.getFieldNames());
+            assertEquals("1234", firstRecord.getValue("id"));
+
+            // record should have a schema that indicates that the 'child' is 
a CHOICE of 2 different record types
+            assertSame(RecordFieldType.CHOICE, 
firstOuterSchema.getDataType("child").get().getFieldType());
+            final List<DataType> firstSubTypes = ((ChoiceDataType) 
firstOuterSchema.getDataType("child").get()).getPossibleSubTypes();
+            assertEquals(2, firstSubTypes.size());
+            assertEquals(2L, firstSubTypes.stream().filter(type -> 
type.getFieldType() == RecordFieldType.RECORD).count());
+
+            // child record should have a schema with "id" as the only field
+            final Object childObject = firstRecord.getValue("child");
+            assertTrue(childObject instanceof Record);
+            final Record firstChildRecord = (Record) childObject;
+            final RecordSchema firstChildSchema = firstChildRecord.getSchema();
+
+            assertEquals(Collections.singletonList("id"), 
firstChildSchema.getFieldNames());
+
+            // evaluate second record
+            final Record secondRecord = reader.nextRecord();
+            assertNotNull(secondRecord);
+
+            final RecordSchema secondOuterSchema = secondRecord.getSchema();
+            assertEquals(Arrays.asList("id", "child"), 
secondOuterSchema.getFieldNames());
+            assertEquals("1234", secondRecord.getValue("id"));
+
+            // record should have a schema that indicates that the 'child' is 
a CHOICE of 2 different record types
+            assertSame(RecordFieldType.CHOICE, 
secondOuterSchema.getDataType("child").get().getFieldType());
+            final List<DataType> secondSubTypes = ((ChoiceDataType) 
secondOuterSchema.getDataType("child").get()).getPossibleSubTypes();
+            assertEquals(2, secondSubTypes.size());
+            assertEquals(2L, secondSubTypes.stream().filter(type -> 
type.getFieldType() == RecordFieldType.RECORD).count());
+
+            // child record should have a schema with "name" as the only field
+            final Object secondChildObject = secondRecord.getValue("child");
+            assertTrue(secondChildObject instanceof Record);
+            final Record secondChildRecord = (Record) secondChildObject;
+            final RecordSchema secondChildSchema = 
secondChildRecord.getSchema();
+
+            assertEquals(Collections.singletonList("name"), 
secondChildSchema.getFieldNames());
+
+            assertNull(reader.nextRecord());
+        }
+
+    }
+
+    @Test
+    void testReadArray() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    @Disabled("Not sure there is such a thing as one Yaml doc per line")
+    void testReadOneLinePerJSON() throws IOException, MalformedRecordException 
{
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-oneline.json");
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    @Disabled("TODO Determine whether this is possible in Yaml")
+    void testReadMultilineJSON() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = 
getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-multiline.json");
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DECIMAL, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 
BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"}, 
firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 
BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333", 
"USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    @Disabled("TODO Determine whether this is possible in Yaml as two arrays 
may end up as one array")
+    void testReadMultilineArrays() throws IOException, 
MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-multiarray.json");
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My 
Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
+
+            final Object[] fourthRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your 
Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    @Disabled("Not sure this makes sense in Yaml")
+    void testReadMixedJSON() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/bank-account-mixed.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            //final Object[] thirdRecordValues = 
reader.nextRecord().getValues();
+            //assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My 
Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
+
+            final Object[] fourthRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your 
Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
+
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final Record schemaValidatedRecord = reader.nextRecord(true, true);
+            assertEquals(1, schemaValidatedRecord.getValue("id"));
+            assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
+            assertNull(schemaValidatedRecord.getValue("balance"));
+        }
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final Record rawRecord = reader.nextRecord(false, false);
+            assertEquals(1, rawRecord.getValue("id"));
+            assertEquals("John Doe", rawRecord.getValue("name"));
+            assertEquals(4750.89, rawRecord.getValue("balance"));
+            assertEquals("123 My Street", rawRecord.getValue("address"));
+            assertEquals("My City", rawRecord.getValue("city"));
+            assertEquals("MS", rawRecord.getValue("state"));
+            assertEquals("11111", rawRecord.getValue("zipCode"));
+            assertEquals("USA", rawRecord.getValue("country"));
+        }
+    }
+
+    @Test
+    void testReadRawRecordFieldOrderPreserved() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String expectedMap = "{id=1, name=John Doe, address=123 My 
Street, city=My City, state=MS, zipCode=11111, country=USA, 
account=MapRecord[{id=42, balance=4750.89}]}";
+        final String expectedRecord = String.format("MapRecord[%s]", 
expectedMap);
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/single-element-nested.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final Record rawRecord = reader.nextRecord(false, false);
+
+            assertEquals(expectedRecord, rawRecord.toString());
+
+            final Map<String, Object> map = rawRecord.toMap();
+            assertEquals(expectedMap, map.toString());
+        }
+    }
+
+    @Test
+    void testReadRawRecordTypeCoercion() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final Record schemaValidatedRecord = reader.nextRecord(true, true);
+            assertEquals("1", schemaValidatedRecord.getValue("id")); // will 
be coerced into a STRING as per the schema
+            assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
+            assertNull(schemaValidatedRecord.getValue("balance"));
+
+            assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
+        }
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final Record rawRecord = reader.nextRecord(false, false);
+            assertEquals(1, rawRecord.getValue("id")); // will return raw 
value of (int) 1
+            assertEquals("John Doe", rawRecord.getValue("name"));
+            assertEquals(4750.89, rawRecord.getValue("balance"));
+            assertEquals("123 My Street", rawRecord.getValue("address"));
+            assertEquals("My City", rawRecord.getValue("city"));
+            assertEquals("MS", rawRecord.getValue("state"));
+            assertEquals("11111", rawRecord.getValue("zipCode"));
+            assertEquals("USA", rawRecord.getValue("country"));
+
+            assertEquals(8, rawRecord.getRawFieldNames().size());
+        }
+    }
+
+    @Test
+    void testDateCoercedFromString() throws IOException, 
MalformedRecordException {
+        final String dateField = "date";
+        final List<RecordField> recordFields = Collections.singletonList(new 
RecordField(dateField, RecordFieldType.DATE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+        final String date = "2000-01-01";
+        final String datePattern = "yyyy-MM-dd";
+        final String yaml = String.format("%s: %s", dateField, date);
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream in = new 
ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8));
+                 final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern, 
timeFormat, timestampFormat)) {
+
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final Object value = record.getValue(dateField);
+                assertTrue(value instanceof java.sql.Date, "With coerceTypes 
set to " + coerceTypes + ", value is not a Date");
+                assertEquals(date, value.toString());
+            }
+        }
+    }
+
+    @Test
+    void testTimestampCoercedFromString() throws IOException, 
MalformedRecordException {
+        final List<RecordField> recordFields = Collections.singletonList(new 
RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/timestamp.yaml");
+                 final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final Object value = record.getValue("timestamp");
+                assertTrue(value instanceof java.sql.Timestamp, "With 
coerceTypes set to " + coerceTypes + ", value is not a Timestamp");
+            }
+        }
+    }
+
+    @Test
+    void testSingleJsonElement() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/single-bank-account.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testSingleJsonElementWithChoiceFields() throws IOException, 
MalformedRecordException {
+        // Wraps default fields by Choice data type to test mapping to a 
Choice type.
+        final List<RecordField> choiceFields = getDefaultFields().stream()
+                .map(f -> new RecordField(f.getFieldName(), 
RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
+        final RecordSchema schema = new SimpleRecordSchema(choiceFields);
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/single-bank-account.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
+            final List<RecordField> fields = schema.getFields();
+            for (int i = 0; i < schema.getFields().size(); i++) {
+                assertTrue(fields.get(i).getDataType() instanceof 
ChoiceDataType);
+                final ChoiceDataType choiceDataType = (ChoiceDataType) 
fields.get(i).getDataType();
+                assertEquals(expectedTypes.get(i), 
choiceDataType.getPossibleSubTypes().get(0).getFieldType());
+            }
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testElementWithNestedData() throws IOException, 
MalformedRecordException {
+        final DataType accountType = 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("account", accountType));
+        fields.remove(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/single-element-nested.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.RECORD);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            final Object[] allButLast = Arrays.copyOfRange(firstRecordValues, 
0, firstRecordValues.length - 1);
+            assertArrayEquals(new Object[] {1, "John Doe", "123 My Street", 
"My City", "MS", "11111", "USA"}, allButLast);
+
+            final Object last = firstRecordValues[firstRecordValues.length - 
1];
+            assertTrue(Record.class.isAssignableFrom(last.getClass()));
+            final Record record = (Record) last;
+            assertEquals(42, record.getValue("id"));
+            assertEquals(4750.89, record.getValue("balance"));
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testElementWithNestedArray() throws IOException, 
MalformedRecordException {
+        final DataType accountRecordType = 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        fields.remove(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/single-element-nested-array.yaml");
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "address", "city", "state", "zipCode", "country", "accounts");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.ARRAY);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            final Object[] nonArrayValues = 
Arrays.copyOfRange(firstRecordValues, 0, firstRecordValues.length - 1);
+            assertArrayEquals(new Object[] {1, "John Doe", "123 My Street", 
"My City", "MS", "11111", "USA"}, nonArrayValues);
+
+            final Object lastRecord = 
firstRecordValues[firstRecordValues.length - 1];
+            assertTrue(Object[].class.isAssignableFrom(lastRecord.getClass()));
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testReadArrayDifferentSchemas() throws IOException, 
MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/bank-account-array-different-schemas.yaml");
+             final JsonTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your 
Street", "Your City", "NY", "33333", null}, secondRecordValues);
+
+            final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {3, "Jake Doe", 4751.89, "124 My 
Street", "My City", "MS", "11111", "USA"}, thirdRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws 
IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/bank-account-array-optional-balance.yaml");
+             final JsonTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, 
RecordFieldType.DOUBLE, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", null, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {3, "Jimmy Doe", null, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, thirdRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+
+    @Test
+    void testReadUnicodeCharacters() throws IOException, 
MalformedRecordException {
+
+        final List<RecordField> fromFields = new ArrayList<>();
+        fromFields.add(new RecordField("id", 
RecordFieldType.LONG.getDataType()));
+        fromFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema fromSchema = new SimpleRecordSchema(fromFields);
+        final DataType fromType = 
RecordFieldType.RECORD.getRecordDataType(fromSchema);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("created_at", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
+        fields.add(new RecordField("unicode", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("from", fromType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/yaml-with-unicode.yaml");
+             final JsonTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+
+            final Object secondValue = firstRecordValues[1];
+            assertTrue(secondValue instanceof Long);
+            assertEquals(832036744985577473L, secondValue);
+
+            final Object unicodeValue = firstRecordValues[2];
+            assertEquals("\u3061\u3083\u6ce3\u304d\u305d\u3046", unicodeValue);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testIncorrectSchema() {
+        final DataType accountType = 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("account", accountType));
+        fields.remove(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        MalformedRecordException mre = 
assertThrows(MalformedRecordException.class, () -> {
+            try (final InputStream in = new 
FileInputStream("src/test/resources/yaml/single-bank-account-wrong-field-type.yaml");
+                 final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+
+                reader.nextRecord().getValues();
+            }
+        });
+
+        final String msg = mre.getCause().getMessage();
+        assertTrue(msg.contains("account.balance"));
+        assertTrue(msg.contains("true"));
+        assertTrue(msg.contains("Double"));
+        assertTrue(msg.contains("Boolean"));
+    }
+
+    @Test
+    void testMergeOfSimilarRecords() throws Exception {
+        // GIVEN

Review Comment:
   The `GIVEN` and `WHEN/THEN` comments can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to