This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 6ce63c0bfe NIFI-11197 Added YamlTreeReader
6ce63c0bfe is described below

commit 6ce63c0bfe615bc9d7d9b6987b464130791bb5aa
Author: dan-s1 <[email protected]>
AuthorDate: Mon Oct 16 18:14:49 2023 +0000

    NIFI-11197 Added YamlTreeReader
    
    - Adjusted JsonTreeReader implementation for sharing common Jackson 
components
    
    This closes #7665
    
    Signed-off-by: David Handermann <[email protected]>
    (cherry picked from commit 4b95129f962b0dfa2b1119f54d096c3a8464cfa5)
---
 .../nifi/json/AbstractJsonRowRecordReader.java     |  41 +-
 .../org/apache/nifi/json/JsonParserFactory.java    |  42 ++
 .../apache/nifi/json/JsonPathRowRecordReader.java  |   2 +-
 .../org/apache/nifi/json/JsonRecordSource.java     |  28 +-
 .../apache/nifi/json/JsonTreeRowRecordReader.java  |  18 +-
 .../org/apache/nifi/json/TokenParserFactory.java   |  36 ++
 .../nifi-yaml-record-utils/pom.xml                 |  45 ++
 .../org/apache/nifi/yaml/YamlParserFactory.java    |  44 ++
 .../org/apache/nifi/yaml/YamlRecordSource.java     |  29 +
 .../apache/nifi/yaml/YamlTreeRowRecordReader.java  |  47 ++
 .../nifi-extension-utils/nifi-record-utils/pom.xml |   1 +
 .../nifi-record-serialization-services/pom.xml     |   6 +
 .../java/org/apache/nifi/json/JsonTreeReader.java  |  51 +-
 .../java/org/apache/nifi/yaml/YamlTreeReader.java  |  81 +++
 .../org.apache.nifi.controller.ControllerService   |   1 +
 .../additionalDetails.html                         |   2 +-
 .../additionalDetails.html                         | 176 +++---
 .../nifi/json/TestJsonTreeRowRecordReader.java     |   3 +-
 .../TestYamlTreeRowRecordReader.java}              | 600 +++++++--------------
 .../yaml/bank-account-array-different-schemas.yaml |  24 +
 .../yaml/bank-account-array-optional-balance.yaml  |  23 +
 .../test/resources/yaml/bank-account-array.yaml    |  16 +
 .../test/resources/yaml/bank-account-comments.yaml |  20 +
 .../src/test/resources/yaml/capture-fields.yaml    |  15 +
 ...oice-of-different-arrays-with-extra-fields.yaml |  15 +
 ...oice-of-embedded-arrays-and-single-records.yaml |  16 +
 .../yaml/choice-of-embedded-similar-records.yaml   |   7 +
 ...-merged-embedded-arrays-and-single-records.yaml |  19 +
 .../yaml/choice-of-string-or-array-record.yaml     |   4 +
 .../resources/yaml/elements-for-record-choice.yaml |   6 +
 .../test/resources/yaml/multiple-nested-field.yaml |  13 +
 .../yaml/nested-array-then-start-object.yaml       |   9 +
 .../src/test/resources/yaml/similar-records.yaml   |   7 +
 .../yaml/single-bank-account-wrong-field-type.yaml |  10 +
 .../test/resources/yaml/single-bank-account.yaml   |   8 +
 .../resources/yaml/single-element-deep-nested.yaml |   8 +
 .../yaml/single-element-nested-array.yaml          |  12 +
 .../test/resources/yaml/single-element-nested.yaml |  10 +
 .../src/test/resources/yaml/timestamp.yaml         |   2 +
 .../src/test/resources/yaml/yaml-with-unicode.yaml |   6 +
 40 files changed, 953 insertions(+), 550 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index d718e7e4bc..e3f6d8567e 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.core.StreamReadConstraints;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.logging.ComponentLog;
@@ -56,7 +55,7 @@ import java.util.function.Supplier;
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
     public static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";
 
-    static final PropertyDescriptor MAX_STRING_LENGTH = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor MAX_STRING_LENGTH = new 
PropertyDescriptor.Builder()
             .name("Max String Length")
             .displayName("Max String Length")
             .description("The maximum allowed length of a string value when 
parsing the JSON document")
@@ -88,7 +87,6 @@ public abstract class AbstractJsonRowRecordReader implements 
RecordReader {
     private JsonParser jsonParser;
     private JsonNode firstJsonNode;
     private StartingFieldStrategy strategy;
-
     private Map<String, String> capturedFields;
     private BiPredicate<String, String> captureFieldPredicate;
 
@@ -104,28 +102,6 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         lazyTimestampFormat = () -> tsf;
     }
 
-    protected AbstractJsonRowRecordReader(final InputStream in,
-                                          final ComponentLog logger,
-                                          final String dateFormat,
-                                          final String timeFormat,
-                                          final String timestampFormat)
-            throws IOException, MalformedRecordException {
-
-        this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, 
null, false, null);
-    }
-
-    protected AbstractJsonRowRecordReader(final InputStream in,
-                                          final ComponentLog logger,
-                                          final String dateFormat,
-                                          final String timeFormat,
-                                          final String timestampFormat,
-                                          final boolean allowComments,
-                                          final StreamReadConstraints 
streamReadConstraints)
-            throws IOException, MalformedRecordException {
-
-        this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, 
null, allowComments, streamReadConstraints);
-    }
-
     /**
      * Constructor with initial logic for JSON to NiFi record parsing.
      *
@@ -140,7 +116,7 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
      *                               be accessed by calling {@link 
#getCapturedFields()}
      * @param allowComments          whether to allow comments within the JSON 
stream
      * @param streamReadConstraints  configuration for the JsonFactory stream 
reader {@link StreamReadConstraints}
-     *
+     * @param tokenParserFactory     factory to provide an instance of 
com.fasterxml.jackson.core.JsonParser
      * @throws IOException              in case of JSON stream processing 
failure
      * @throws MalformedRecordException in case of malformed JSON input
      */
@@ -153,7 +129,8 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
                                           final String nestedFieldName,
                                           final BiPredicate<String, String> 
captureFieldPredicate,
                                           final boolean allowComments,
-                                          final StreamReadConstraints 
streamReadConstraints)
+                                          final StreamReadConstraints 
streamReadConstraints,
+                                          final TokenParserFactory 
tokenParserFactory)
             throws IOException, MalformedRecordException {
 
         this(logger, dateFormat, timeFormat, timestampFormat);
@@ -163,14 +140,8 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         capturedFields = new LinkedHashMap<>();
 
         try {
-            final ObjectMapper codec = new ObjectMapper();
-            if (allowComments) {
-                codec.enable(JsonParser.Feature.ALLOW_COMMENTS);
-            }
-            codec.getFactory().setStreamReadConstraints(streamReadConstraints 
!= null ? streamReadConstraints : DEFAULT_STREAM_READ_CONSTRAINTS);
-
-            jsonParser = codec.getFactory().createParser(in);
-            jsonParser.setCodec(codec);
+            final StreamReadConstraints configuredStreamReadConstraints = 
streamReadConstraints == null ? DEFAULT_STREAM_READ_CONSTRAINTS : 
streamReadConstraints;
+            jsonParser = tokenParserFactory.getJsonParser(in, 
configuredStreamReadConstraints, allowComments);
 
             if (strategy == StartingFieldStrategy.NESTED_FIELD) {
                 while (jsonParser.nextToken() != null) {
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java
new file mode 100644
index 0000000000..d2e7484b3d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+
+public class JsonParserFactory implements TokenParserFactory {
+    @Override
+    public JsonParser getJsonParser(final InputStream in, final 
StreamReadConstraints streamReadConstraints, final boolean allowComments) 
throws IOException {
+        Objects.requireNonNull(in, "Input Stream required");
+        Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints 
required");
+
+        final ObjectMapper objectMapper = new ObjectMapper();
+        
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
+        if (allowComments) {
+            objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
+        }
+        final JsonFactory jsonFactory = objectMapper.getFactory();
+        return jsonFactory.createParser(in);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index 328cf0c868..bc9ba69800 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -66,7 +66,7 @@ public class JsonPathRowRecordReader extends 
AbstractJsonRowRecordReader {
                                    final boolean allowComments, final 
StreamReadConstraints streamReadConstraints)
                 throws MalformedRecordException, IOException {
 
-        super(in, logger, dateFormat, timeFormat, timestampFormat, 
allowComments, streamReadConstraints);
+        super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, 
null, allowComments, streamReadConstraints, new JsonParserFactory());
 
         this.schema = schema;
         this.jsonPaths = jsonPaths;
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java
index 348c2ef02f..5ff735fbdd 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java
@@ -16,12 +16,11 @@
  */
 package org.apache.nifi.json;
 
-import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.StreamReadConstraints;
 import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,29 +30,28 @@ import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
     private static final Logger logger = 
LoggerFactory.getLogger(JsonRecordSource.class);
-    private static final JsonFactory jsonFactory;
+
+    private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS 
= StreamReadConstraints.defaults();
+
+    private static final boolean ALLOW_COMMENTS_ENABLED = true;
+
     private final JsonParser jsonParser;
     private final StartingFieldStrategy strategy;
-    private final String startingFieldName;
-
-    static {
-        jsonFactory = new JsonFactory();
-        jsonFactory.setCodec(new ObjectMapper());
-    }
 
     public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createParser(in);
-        strategy = null;
-        startingFieldName = null;
+        this(in, null, null);
     }
 
     public JsonRecordSource(final InputStream in, final StartingFieldStrategy 
strategy, final String startingFieldName) throws IOException {
-        jsonParser = jsonFactory.createParser(in);
+        this(in , strategy, startingFieldName, new JsonParserFactory());
+    }
+
+    public JsonRecordSource(final InputStream in, final StartingFieldStrategy 
strategy, final String startingFieldName, TokenParserFactory 
tokenParserFactory) throws IOException {
+        jsonParser = tokenParserFactory.getJsonParser(in, 
DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED);
         this.strategy = strategy;
-        this.startingFieldName = startingFieldName;
 
         if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-            final SerializedString serializedNestedField = new 
SerializedString(this.startingFieldName);
+            final SerializedString serializedNestedField = new 
SerializedString(startingFieldName);
             while (!jsonParser.nextFieldName(serializedNestedField) && 
jsonParser.hasCurrentToken());
             logger.debug("Parsing starting at nested field [{}]", 
startingFieldName);
         }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 758cd96a48..489474e804 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -56,16 +56,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                                    final String dateFormat, final String 
timeFormat, final String timestampFormat)
             throws IOException, MalformedRecordException {
 
-        this(in, logger, schema, dateFormat, timeFormat, timestampFormat, 
false, null);
-    }
-
-    public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
-                                   final String dateFormat, final String 
timeFormat, final String timestampFormat,
-                                   final boolean allowComments, final 
StreamReadConstraints streamReadConstraints)
-            throws IOException, MalformedRecordException {
-
-        this(in, logger, schema, dateFormat, timeFormat, timestampFormat, 
null, null, null, null,
-                allowComments, streamReadConstraints);
+        this(in, logger, schema, dateFormat, timeFormat, timestampFormat,  
null, null, null, null);
     }
 
     public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
@@ -75,18 +66,18 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
             throws IOException, MalformedRecordException {
 
         this(in, logger, schema, dateFormat, timeFormat, timestampFormat, 
startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
-                captureFieldPredicate, false, null);
+                captureFieldPredicate, false, null, new JsonParserFactory());
     }
 
     public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
                                    final String dateFormat, final String 
timeFormat, final String timestampFormat,
                                    final StartingFieldStrategy 
startingFieldStrategy, final String startingFieldName,
                                    final SchemaApplicationStrategy 
schemaApplicationStrategy, final BiPredicate<String, String> 
captureFieldPredicate,
-                                   final boolean allowComments, final 
StreamReadConstraints streamReadConstraints)
+                                   final boolean allowComments, final 
StreamReadConstraints streamReadConstraints, final TokenParserFactory 
tokenParserFactory)
             throws IOException, MalformedRecordException {
 
         super(in, logger, dateFormat, timeFormat, timestampFormat, 
startingFieldStrategy, startingFieldName, captureFieldPredicate,
-                allowComments, streamReadConstraints);
+                allowComments, streamReadConstraints, tokenParserFactory);
 
         if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && 
schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
             this.schema = getSelectedSchema(schema, startingFieldName);
@@ -110,7 +101,6 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                     }
                 }
             }
-
         }
         throw new RuntimeException(String.format("Selected schema field [%s] 
not found.", startingFieldName));
     }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java
new file mode 100644
index 0000000000..1c3a812fe2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface TokenParserFactory {
+    /**
+     * Get JSON Parser implementation for provided Input Stream with 
configured settings
+     *
+     * @param in Input Stream to be parsed
+     * @param streamReadConstraints Stream Read Constraints applied
+     * @param allowComments Whether to allow comments when parsing
+     * @return JSON Parser
+     * @throws IOException Thrown on failures to read the Input Stream
+     */
+    JsonParser getJsonParser(InputStream in, StreamReadConstraints 
streamReadConstraints, boolean allowComments) throws IOException;
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/pom.xml
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/pom.xml
new file mode 100644
index 0000000000..4e69015d99
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-record-utils</artifactId>
+        <version>1.24.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-yaml-record-utils</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-record-utils</artifactId>
+            <version>1.24.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java
new file mode 100644
index 0000000000..a6df6c198a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.yaml;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+import org.apache.nifi.json.TokenParserFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class YamlParserFactory implements TokenParserFactory {
+    private static final YAMLFactory YAML_FACTORY = new YAMLFactory(new 
YAMLMapper());
+
+    /**
+     * Get Parser implementation for YAML
+     *
+     * @param in Input Stream to be parsed
+     * @param streamReadConstraints Stream Read Constraints are not supported 
in YAML
+     * @param allowComments Whether to allow comments when parsing does not 
apply to YAML
+     * @return YAML Parser
+     * @throws IOException Thrown on parser creation failures
+     */
+    @Override
+    public JsonParser getJsonParser(final InputStream in, final 
StreamReadConstraints streamReadConstraints, final boolean allowComments) 
throws IOException {
+        return YAML_FACTORY.createParser(in);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java
new file mode 100644
index 0000000000..f47a591de6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.yaml;
+
+import org.apache.nifi.json.JsonRecordSource;
+import org.apache.nifi.json.StartingFieldStrategy;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class YamlRecordSource extends JsonRecordSource {
+   public YamlRecordSource(final InputStream in, final StartingFieldStrategy 
strategy, final String startingFieldName) throws IOException {
+       super(in, strategy, startingFieldName, new YamlParserFactory());
+   }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java
new file mode 100644
index 0000000000..25136d85aa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.yaml;
+
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.BiPredicate;
+
+public class YamlTreeRowRecordReader extends JsonTreeRowRecordReader {
+
+    public YamlTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
+                                   final String dateFormat, final String 
timeFormat, final String timestampFormat) throws IOException, 
MalformedRecordException {
+        this(in, logger, schema, dateFormat, timeFormat, timestampFormat, 
null, null, null, null);
+    }
+
+    public YamlTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema,
+                                   final String dateFormat, final String 
timeFormat, final String timestampFormat,
+                                   final StartingFieldStrategy 
startingFieldStrategy, final String startingFieldName,
+                                   final SchemaApplicationStrategy 
schemaApplicationStrategy, final BiPredicate<String, String> 
captureFieldPredicate)
+            throws IOException, MalformedRecordException {
+
+        super(in, logger, schema, dateFormat, timeFormat, timestampFormat, 
startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
+                captureFieldPredicate, true, null, new YamlParserFactory());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
index 1c76866fec..e14cd236c2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
@@ -30,6 +30,7 @@
         <module>nifi-json-record-utils</module>
              <module>nifi-mock-record-utils</module>
         <module>nifi-schema-inference-utils</module>
+        <module>nifi-yaml-record-utils</module>
     </modules>
 
     <dependencyManagement>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index e5064f1bfb..4517215d8b 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -66,6 +66,11 @@
             <artifactId>nifi-json-record-utils</artifactId>
             <version>1.24.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-yaml-record-utils</artifactId>
+            <version>1.24.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-csv</artifactId>
@@ -261,6 +266,7 @@
                         <exclude>src/test/resources/xml/testschema</exclude>
                         <exclude>src/test/resources/xml/testschema2</exclude>
                         <exclude>src/test/resources/xml/testschema3</exclude>
+                        <exclude>src/test/resources/yaml/*.yaml</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index bcf7d66c95..596b3880bd 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -70,12 +70,12 @@ import static 
org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
         + "See the Usage of the Controller Service for more information and 
examples.")
 @SeeAlso(JsonPathReader.class)
 public class JsonTreeReader extends SchemaRegistryService implements 
RecordReaderFactory {
-    private volatile String dateFormat;
-    private volatile String timeFormat;
-    private volatile String timestampFormat;
-    private volatile String startingFieldName;
-    private volatile StartingFieldStrategy startingFieldStrategy;
-    private volatile SchemaApplicationStrategy schemaApplicationStrategy;
+    protected volatile String dateFormat;
+    protected volatile String timeFormat;
+    protected volatile String timestampFormat;
+    protected volatile String startingFieldName;
+    protected volatile StartingFieldStrategy startingFieldStrategy;
+    protected volatile SchemaApplicationStrategy schemaApplicationStrategy;
     private volatile boolean allowComments;
     private volatile StreamReadConstraints streamReadConstraints;
 
@@ -138,9 +138,29 @@ public class JsonTreeReader extends SchemaRegistryService 
implements RecordReade
         this.startingFieldStrategy = 
StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue());
         this.startingFieldName = 
context.getProperty(STARTING_FIELD_NAME).getValue();
         this.schemaApplicationStrategy = 
SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue());
+        this.streamReadConstraints = buildStreamReadConstraints(context);
+        this.allowComments = isAllowCommentsEnabled(context);
+    }
+
+    /**
+     * Build Stream Read Constraints based on available properties
+     *
+     * @param context Configuration Context with property values
+     * @return Stream Read Constraints
+     */
+    protected StreamReadConstraints buildStreamReadConstraints(final 
ConfigurationContext context) {
         final int maxStringLength = 
context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
-        this.streamReadConstraints = 
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
-        this.allowComments = 
context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
+        return 
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+    }
+
+    /**
+     * Determine whether to allow comments when parsing based on available 
properties
+     *
+     * @param context Configuration Context with property values
+     * @return Allow comments status
+     */
+    protected boolean isAllowCommentsEnabled(final ConfigurationContext 
context) {
+        return 
context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
     }
 
     @Override
@@ -153,9 +173,7 @@ public class JsonTreeReader extends SchemaRegistryService 
implements RecordReade
 
     @Override
     protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
schemaAccessStrategy, final SchemaRegistry schemaRegistry, final 
PropertyContext context) {
-        final RecordSourceFactory<JsonNode> jsonSourceFactory =
-                (var, in) -> new JsonRecordSource(in, startingFieldStrategy, 
startingFieldName);
-
+        final RecordSourceFactory<JsonNode> jsonSourceFactory = 
createJsonRecordSourceFactory();
         final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier =
                 () -> new JsonSchemaInference(new 
TimeValueInference(dateFormat, timeFormat, timestampFormat));
 
@@ -163,16 +181,23 @@ public class JsonTreeReader extends SchemaRegistryService 
implements RecordReade
                 () -> super.getSchemaAccessStrategy(schemaAccessStrategy, 
schemaRegistry, context));
     }
 
+    protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
+        return (variables, in) -> new JsonRecordSource(in, 
startingFieldStrategy, startingFieldName);
+    }
+
     @Override
     protected AllowableValue getDefaultSchemaAccessStrategy() {
         return INFER_SCHEMA;
     }
 
-    @Override
     public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final long inputLength, final ComponentLog 
logger)
             throws IOException, MalformedRecordException, 
SchemaNotFoundException {
         final RecordSchema schema = getSchema(variables, in, null);
+        return createJsonTreeRowRecordReader(in, logger, schema);
+    }
+
+    protected JsonTreeRowRecordReader createJsonTreeRowRecordReader(final 
InputStream in, final ComponentLog logger, final RecordSchema schema) throws 
IOException, MalformedRecordException {
         return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, 
timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
-                schemaApplicationStrategy, null, allowComments, 
streamReadConstraints);
+                schemaApplicationStrategy, null, allowComments, 
streamReadConstraints, new JsonParserFactory());
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java
new file mode 100644
index 0000000000..ab1a5682ed
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.yaml;
+
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.json.AbstractJsonRowRecordReader;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@Tags({"yaml", "tree", "record", "reader", "parser"})
+@CapabilityDescription("Parses YAML into individual Record objects. While the 
reader expects each record "
+        + "to be well-formed YAML, the content of a FlowFile may consist of 
many records, each as a well-formed "
+        + "YAML array or YAML object. "
+        + "If an array is encountered, each element in that array will be 
treated as a separate record. "
+        + "If the schema that is configured contains a field that is not 
present in the YAML, a null value will be used. If the YAML contains "
+        + "a field that is not present in the schema, that field will be 
skipped. "
+        + "See the Usage of the Controller Service for more information and 
examples.")
+public class YamlTreeReader extends JsonTreeReader {
+
+    private static final boolean ALLOW_COMMENTS_DISABLED = false;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        // Remove those properties which are not applicable for YAML
+        properties.remove(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
+        properties.remove(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
+
+        return properties;
+    }
+
+    @Override
+    protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
+        return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, 
startingFieldName);
+    }
+
+    @Override
+    protected JsonTreeRowRecordReader 
createJsonTreeRowRecordReader(InputStream in, ComponentLog logger, RecordSchema 
schema) throws IOException, MalformedRecordException {
+        return new YamlTreeRowRecordReader(in, logger, schema, dateFormat, 
timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
+                schemaApplicationStrategy, null);
+    }
+
+    @Override
+    protected StreamReadConstraints buildStreamReadConstraints(final 
ConfigurationContext context) {
+        return StreamReadConstraints.defaults();
+    }
+
+    @Override
+    protected boolean isAllowCommentsEnabled(final ConfigurationContext 
context) {
+        return ALLOW_COMMENTS_DISABLED;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 7c391e2443..0b8484c3db 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -38,3 +38,4 @@ org.apache.nifi.xml.XMLReader
 org.apache.nifi.xml.XMLRecordSetWriter
 org.apache.nifi.windowsevent.WindowsEventLogReader
 org.apache.nifi.schema.inference.VolatileSchemaCache
+org.apache.nifi.yaml.YamlTreeReader
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
index f46b523758..d80f371913 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
@@ -411,7 +411,7 @@
 
         <p>
             When using JsonTreeReader with "Nested Field Strategy" and the 
"Schema Access Strategy" is not "Infer Schema",
-            it can be configured for the entire original JSON ("Whole JSON" 
strategy) or for the nested field section ("Selected part" strategy).
+            it can be configured for the entire original JSON ("Whole 
document" strategy) or for the nested field section ("Selected part" strategy).
         </p>
 
     </body>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.yaml.YamlTreeReader/additionalDetails.html
similarity index 83%
copy from 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
copy to 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.yaml.YamlTreeReader/additionalDetails.html
index f46b523758..ece3862e05 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.yaml.YamlTreeReader/additionalDetails.html
@@ -16,23 +16,23 @@
     -->
     <head>
         <meta charset="utf-8"/>
-        <title>JsonTreeReader</title>
+        <title>YamlTreeReader</title>
         <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
     </head>
 
     <body>
         <p>
-            The JsonTreeReader Controller Service reads a JSON Object and 
creates a Record object either for the
-            entire JSON Object tree or a subpart (see "Starting Field 
Strategies" section). The Controller Service
-            must be configured with a Schema that describes the structure of 
the JSON data. If any field exists in
-            the JSON that is not in the schema, that field will be skipped. If 
the schema contains a field for which
-            no JSON field exists, a null value will be used in the Record (or 
the default value defined in the schema,
+            The YamlTreeReader Controller Service reads a YAML Object and 
creates a Record object either for the
+            entire YAML Object tree or a subpart (see "Starting Field 
Strategies" section). The Controller Service
+            must be configured with a Schema that describes the structure of 
the YAML data. If any field exists in
+            the YAML that is not in the schema, that field will be skipped. If 
the schema contains a field for which
+            no YAML field exists, a null value will be used in the Record (or 
the default value defined in the schema,
             if applicable).
         </p>
 
         <p>
-            If the root element of the JSON is a JSON Array, each JSON Object 
within that array will be treated as
-            its own separate Record. If the root element is a JSON Object, the 
JSON will all be treated as a single
+            If the root element of the YAML is a YAML Array, each YAML Object 
within that array will be treated as
+            its own separate Record. If the root element is a YAML Object, the 
YAML will all be treated as a single
             Record.
         </p>
 
@@ -172,92 +172,128 @@
         <h2>Starting Field Strategies</h2>
 
         <p>
-            When using JsonTreeReader, two different starting field strategies 
can be selected. With the default Root Node strategy, the JsonTreeReader begins 
processing from the root element
-            of the JSON and creates a Record object for the entire JSON Object 
tree, while the Nested Field strategy defines a nested field from which to 
begin processing.
+            When using YamlTreeReader, two different starting field strategies 
can be selected. With the default Root Node strategy, the YamlTreeReader begins 
processing from the root element
+            of the YAML and creates a Record object for the entire YAML Object 
tree, while the Nested Field strategy defines a nested field from which to 
begin processing.
         </p>
         <p>
-            Using the Nested Field strategy, a schema corresponding to the 
nested JSON part should be specified. In case of schema inference, the 
JsonTreeReader will automatically
+            Using the Nested Field strategy, a schema corresponding to the 
nested YAML part should be specified. In case of schema inference, the 
YamlTreeReader will automatically
             infer a schema from nested records.
         </p>
 
         <h3>Root Node Strategy</h3>
 
         <p>
-            Consider the following JSON is read with the default Root Node 
strategy:
+            Consider the following YAML is read with the default Root Node 
strategy:
         </p>
 <code>
 <pre>
-[
-  {
-    "id": 17,
-    "name": "John",
-    "child": {
-      "id": "1"
-    },
-    "dob": "10-29-1982",
-    "siblings": [
-      {
-        "name": "Jeremy",
-        "id": 4
-      },
-      {
-        "name": "Julia",
-        "id": 8
-      }
-    ]
-  },
-  {
-    "id": 98,
-    "name": "Jane",
-    "child": {
-      "id": 2
-    },
-    "dob": "08-30-1984",
-    "gender": "F",
-    "siblingIds": [],
-    "siblings": []
-  }
-]
+- id: 17
+  name: John
+  child:
+    id: "1"
+  dob: 10-29-1982
+  siblings:
+    - name: Jeremy
+      id: 4
+    - name: Julia
+      id: 8
+- id: 98
+  name: Jane
+  child:
+    id: 2
+  dob: 08-30-1984
+  gender: F
+  siblingIds: []
+  siblings: []
 </pre>
 </code>
 
         <p>
-            Also, consider that the schema that is configured for this JSON is 
as follows (assuming that the AvroSchemaRegistry
+            Also, consider that the schema that is configured for this YAML is 
as follows (assuming that the AvroSchemaRegistry
             Controller Service is chosen to denote the Schema):
         </p>
 
 <code>
 <pre>
 {
-    "namespace": "nifi",
-    "name": "person",
-    "type": "record",
-    "fields": [
-        { "name": "id", "type": "int" },
-        { "name": "name", "type": "string" },
-        { "name": "gender", "type": "string" },
-        { "name": "dob", "type": {
-            "type": "int",
-            "logicalType": "date"
-        }},
-        { "name": "siblings", "type": {
-            "type": "array",
-            "items": {
-                "type": "record",
-                "fields": [
-                    { "name": "name", "type": "string" }
-                ]
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [
+    {
+      "name": "id",
+      "type": ["int","null"]
+    },
+    {
+      "name": "name",
+      "type": ["string","null"]
+    },
+    {
+      "name": "child",
+      "type": [
+        {
+          "type": "record",
+          "name": "childType",
+          "fields": [
+            {
+              "name": "id",
+              "type": ["int","string","null"]
             }
-        }}
-    ]
+          ]
+        },
+        "null"
+      ]
+    },
+    {
+      "name": "dob",
+      "type": ["string","null"]
+    },
+    {
+      "name": "siblings",
+      "type": [
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "siblingsType",
+            "fields": [
+              {
+                "name": "name",
+                "type": ["string","null"]
+              },
+              {
+                "name": "id",
+                "type": ["int","null"]
+              }
+            ]
+          }
+        },
+        "null"
+      ]
+    },
+    {
+      "name": "gender",
+      "type": ["string","null"]
+    },
+    {
+      "name": "siblingIds",
+      "type": [
+        {
+          "type": "array",
+          "items": "string"
+        },
+        "null"
+      ]
+    }
+  ]
 }
 </pre>
 </code>
 
         <p>
             Let us also assume that this Controller Service is configured with 
the "Date Format" property set to "MM-dd-yyyy", as this
-            matches the date format used for our JSON data. This will result 
in the JSON creating two separate records, because the root
-            element is a JSON array with two elements.
+            matches the date format used for our YAML data. This will result 
in the YAML creating two separate records, because the root
+            element is a YAML array with two elements.
         </p>
 
         <p>
@@ -351,8 +387,8 @@
         <h3>Nested Field Strategy</h3>
 
         <p>
-            Using the Nested Field strategy, consider the same JSON where the 
specified Starting Field Name is
-            "siblings". The schema that is configured for this JSON is as 
follows:
+            Using the Nested Field strategy, consider the same YAML where the 
specified Starting Field Name is
+            "siblings". The schema that is configured for this YAML is as 
follows:
         </p>
 
 <code>
@@ -410,8 +446,8 @@
         <h2>Schema Application Strategies</h2>
 
         <p>
-            When using JsonTreeReader with "Nested Field Strategy" and the 
"Schema Access Strategy" is not "Infer Schema",
-            it can be configured for the entire original JSON ("Whole JSON" 
strategy) or for the nested field section ("Selected part" strategy).
+            When using YamlTreeReader with "Nested Field Strategy" and the 
"Schema Access Strategy" is not "Infer Schema",
+            it can be configured for the entire original YAML ("Whole 
document" strategy) or for the nested field section ("Selected part" strategy).
         </p>
 
     </body>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 7c7f715eff..5f9e01ce12 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -326,7 +326,8 @@ class TestJsonTreeRowRecordReader {
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
         try (final InputStream in = new FileInputStream(inputFile);
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat, allowComments, streamReadConstraints)) {
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat,
+                     null, null, null, null, allowComments, 
streamReadConstraints, new JsonParserFactory())) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
similarity index 69%
copy from 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
copy to 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
index 7c7f715eff..befffcbadc 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.json;
+package org.apache.nifi.yaml;
 
-import com.fasterxml.jackson.core.StreamReadConstraints;
-import com.fasterxml.jackson.core.exc.StreamConstraintsException;
 import org.apache.avro.Schema;
 import org.apache.commons.io.FileUtils;
 import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.json.JsonSchemaInference;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
 import org.apache.nifi.schema.inference.TimeValueInference;
@@ -36,17 +38,16 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.util.EqualsWrapper;
 import org.apache.nifi.util.MockComponentLog;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,13 +55,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -68,7 +69,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
-class TestJsonTreeRowRecordReader {
+class TestYamlTreeRowRecordReader {
+    private static final Path BANK_ACCOUNT_ARRAY = 
Paths.get("src/test/resources/yaml/bank-account-array.yaml");
     private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
     private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
     private final String timestampFormat = 
RecordFieldType.TIMESTAMP.getDefaultFormat();
@@ -101,13 +103,13 @@ class TestJsonTreeRowRecordReader {
     @Test
     void testReadChoiceOfStringOrArrayOfRecords() throws IOException, 
MalformedRecordException {
         final File schemaFile = new 
File("src/test/resources/json/choice-of-string-or-array-record.avsc");
-        final File jsonFile = new 
File("src/test/resources/json/choice-of-string-or-array-record.json");
+        final File jsonFile = new 
File("src/test/resources/yaml/choice-of-string-or-array-record.yaml");
 
         final Schema avroSchema = new Schema.Parser().parse(schemaFile);
         final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema);
 
-        try (final InputStream fis = new FileInputStream(jsonFile);
-            final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema, 
dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream fis = Files.newInputStream(jsonFile.toPath());
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema, 
dateFormat, timeFormat, timestampFormat)) {
 
             final Record record = reader.nextRecord();
             final Object[] fieldsArray = record.getAsArray("fields");
@@ -127,65 +129,6 @@ class TestJsonTreeRowRecordReader {
             final Record firstTypeRecord = (Record) firstType;
             assertEquals("string", firstTypeRecord.getAsString("type"));
         }
-
-    }
-
-    @Test
-    @Disabled("Intended only for manual testing to determine performance 
before/after modifications")
-    void testPerformanceOnLocalFile() throws IOException, 
MalformedRecordException {
-        final RecordSchema schema = new 
SimpleRecordSchema(Collections.emptyList());
-
-        final File file = new 
File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289");
-        final byte[] data = Files.readAllBytes(file.toPath());
-
-        final ComponentLog logger = mock(ComponentLog.class);
-
-        int recordCount = 0;
-        final int iterations = 1000;
-
-        for (int j = 0; j < 5; j++) {
-            final long start = System.nanoTime();
-            for (int i = 0; i < iterations; i++) {
-                try (final InputStream in = new ByteArrayInputStream(data);
-                    final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, 
timestampFormat)) {
-                    while (reader.nextRecord() != null) {
-                        recordCount++;
-                    }
-                }
-            }
-            final long nanos = System.nanoTime() - start;
-            final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
-            System.out.println("Took " + millis + " millis to read " + 
recordCount + " records");
-        }
-    }
-
-    @Test
-    @Disabled("Intended only for manual testing to determine performance 
before/after modifications")
-    void testPerformanceOnIndividualMessages() throws IOException, 
MalformedRecordException {
-        final RecordSchema schema = new 
SimpleRecordSchema(Collections.emptyList());
-
-        final File file = new 
File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json");
-        final byte[] data = Files.readAllBytes(file.toPath());
-
-        final ComponentLog logger = mock(ComponentLog.class);
-
-        int recordCount = 0;
-        final int iterations = 1_000_000;
-
-        for (int j = 0; j < 5; j++) {
-            final long start = System.nanoTime();
-            for (int i = 0; i < iterations; i++) {
-                try (final InputStream in = new ByteArrayInputStream(data);
-                    final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, 
timestampFormat)) {
-                    while (reader.nextRecord() != null) {
-                        recordCount++;
-                    }
-                }
-            }
-            final long nanos = System.nanoTime() - start;
-            final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
-            System.out.println("Took " + millis + " millis to read " + 
recordCount + " records");
-        }
     }
 
     @Test
@@ -193,8 +136,8 @@ class TestJsonTreeRowRecordReader {
         final Schema avroSchema = new Schema.Parser().parse(new 
File("src/test/resources/json/record-choice.avsc"));
         final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema);
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/elements-for-record-choice.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/elements-for-record-choice.yaml"));
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, 
timeFormat, timestampFormat)) {
 
             // evaluate first record
             final Record firstRecord = reader.nextRecord();
@@ -241,15 +184,14 @@ class TestJsonTreeRowRecordReader {
 
             assertNull(reader.nextRecord());
         }
-
     }
 
     @Test
     void testReadArray() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-array.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -271,89 +213,12 @@ class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    void testReadOneLinePerJSON() throws IOException, MalformedRecordException 
{
-        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
-
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-oneline.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
-
-            final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
-            assertEquals(expectedFieldNames, fieldNames);
-
-            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
-            assertEquals(expectedTypes, dataTypes);
-
-            final Object[] firstRecordValues = reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
-
-            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
-
-            assertNull(reader.nextRecord());
-        }
-    }
-
-    @Test
-    void testReadMultilineJSON() throws IOException, MalformedRecordException {
-        
testReadAccountJson("src/test/resources/json/bank-account-multiline.json", 
false, null);
-    }
-
-    @Test
-    void testReadJSONStringTooLong() {
-        final StreamConstraintsException mre = 
assertThrows(StreamConstraintsException.class, () ->
-                
testReadAccountJson("src/test/resources/json/bank-account-multiline.json", 
false, StreamReadConstraints.builder().maxStringLength(2).build()));
-        assertTrue(mre.getMessage().contains("maximum length"));
-        assertTrue(mre.getMessage().contains("(2)"));
-    }
-
-    @Test
-    void testReadJSONComments() throws IOException, MalformedRecordException {
-        
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", 
true, StreamReadConstraints.builder().maxStringLength(20_000).build());
-    }
-
-    @Test
-    void testReadJSONDisallowComments() {
-        final MalformedRecordException mre = 
assertThrows(MalformedRecordException.class, () ->
-            
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", 
false, StreamReadConstraints.builder().maxStringLength(20_000).build()));
-        assertTrue(mre.getMessage().contains("not parse"));
-    }
-
-    private void testReadAccountJson(final String inputFile, final boolean 
allowComments, final StreamReadConstraints streamReadConstraints) throws 
IOException, MalformedRecordException {
-        final List<RecordField> fields = 
getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-
-        try (final InputStream in = new FileInputStream(inputFile);
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat, allowComments, streamReadConstraints)) {
-
-            final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
-            assertEquals(expectedFieldNames, fieldNames);
-
-            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DECIMAL, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
-            assertEquals(expectedTypes, dataTypes);
-
-            final Object[] firstRecordValues = reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {1, "John Doe", 
BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"}, 
firstRecordValues);
-
-            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {2, "Jane Doe", 
BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333", 
"USA"}, secondRecordValues);
-
-            assertNull(reader.nextRecord());
-        }
-    }
-
-    @Test
-    void testReadMultilineArrays() throws IOException, 
MalformedRecordException {
+    void testReadYamlComments() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-multiarray.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
-
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/bank-account-comments.yaml")))
 {
+            final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat,
+                    null, null, null, null);
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
@@ -369,48 +234,10 @@ class TestJsonTreeRowRecordReader {
             final Object[] secondRecordValues = 
reader.nextRecord().getValues();
             assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
 
-            final Object[] thirdRecordValues = reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My 
Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
-
-            final Object[] fourthRecordValues = 
reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your 
Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
-
             assertNull(reader.nextRecord());
         }
     }
 
-    @Test
-    void testReadMixedJSON() throws IOException, MalformedRecordException {
-        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
-
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-mixed.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
-
-            final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
-            assertEquals(expectedFieldNames, fieldNames);
-
-            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING);
-            assertEquals(expectedTypes, dataTypes);
-
-            final Object[] firstRecordValues = reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My 
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
-
-            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your 
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
-
-            final Object[] thirdRecordValues = reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My 
Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
-
-            final Object[] fourthRecordValues = 
reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your 
Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
-
-
-            assertNull(reader.nextRecord());
-        }
-    }
 
     @Test
     void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, 
MalformedRecordException {
@@ -419,8 +246,8 @@ class TestJsonTreeRowRecordReader {
         fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-array.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final Record schemaValidatedRecord = reader.nextRecord(true, true);
             assertEquals(1, schemaValidatedRecord.getValue("id"));
@@ -428,8 +255,8 @@ class TestJsonTreeRowRecordReader {
             assertNull(schemaValidatedRecord.getValue("balance"));
         }
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-array.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final Record rawRecord = reader.nextRecord(false, false);
             assertEquals(1, rawRecord.getValue("id"));
@@ -451,8 +278,8 @@ class TestJsonTreeRowRecordReader {
 
         final String expectedMap = "{id=1, name=John Doe, address=123 My 
Street, city=My City, state=MS, zipCode=11111, country=USA, 
account=MapRecord[{id=42, balance=4750.89}]}";
         final String expectedRecord = String.format("MapRecord[%s]", 
expectedMap);
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/single-element-nested.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/single-element-nested.yaml"));
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final Record rawRecord = reader.nextRecord(false, false);
 
@@ -470,8 +297,8 @@ class TestJsonTreeRowRecordReader {
         fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-array.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final Record schemaValidatedRecord = reader.nextRecord(true, true);
             assertEquals("1", schemaValidatedRecord.getValue("id")); // will 
be coerced into a STRING as per the schema
@@ -481,8 +308,8 @@ class TestJsonTreeRowRecordReader {
             assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
         }
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-array.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final Record rawRecord = reader.nextRecord(false, false);
             assertEquals(1, rawRecord.getValue("id")); // will return raw 
value of (int) 1
@@ -506,10 +333,10 @@ class TestJsonTreeRowRecordReader {
 
         final String date = "2000-01-01";
         final String datePattern = "yyyy-MM-dd";
-        final String json = String.format("{ \"%s\": \"%s\" }", dateField, 
date);
+        final String yaml = String.format("%s: %s", dateField, date);
         for (final boolean coerceTypes : new boolean[] {true, false}) {
-            try (final InputStream in = new 
ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
-                 final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern, 
timeFormat, timestampFormat)) {
+            try (final InputStream in = new 
ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8));
+                 final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern, 
timeFormat, timestampFormat)) {
 
                 final Record record = reader.nextRecord(coerceTypes, false);
                 final Object value = record.getValue(dateField);
@@ -525,8 +352,8 @@ class TestJsonTreeRowRecordReader {
         final RecordSchema schema = new SimpleRecordSchema(recordFields);
 
         for (final boolean coerceTypes : new boolean[] {true, false}) {
-            try (final InputStream in = new 
FileInputStream("src/test/resources/json/timestamp.json");
-                 final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+            try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/timestamp.yaml"));
+                 final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, "yyyy/MM/dd HH:mm:ss")) {
 
                 final Record record = reader.nextRecord(coerceTypes, false);
                 final Object value = record.getValue("timestamp");
@@ -539,8 +366,8 @@ class TestJsonTreeRowRecordReader {
     void testSingleJsonElement() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/single-bank-account.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/single-bank-account.yaml"));
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -565,8 +392,8 @@ class TestJsonTreeRowRecordReader {
                 .map(f -> new RecordField(f.getFieldName(), 
RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
         final RecordSchema schema = new SimpleRecordSchema(choiceFields);
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/single-bank-account.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/single-bank-account.yaml"));
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -596,8 +423,8 @@ class TestJsonTreeRowRecordReader {
         fields.remove(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/single-element-nested.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/single-element-nested.yaml"));
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
             final List<RecordFieldType> expectedTypes = 
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
@@ -628,8 +455,8 @@ class TestJsonTreeRowRecordReader {
         fields.remove(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/single-element-nested-array.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/single-element-nested-array.yaml"));
+             final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "address", "city", "state", "zipCode", "country", "accounts");
@@ -655,8 +482,8 @@ class TestJsonTreeRowRecordReader {
     void testReadArrayDifferentSchemas() throws IOException, 
MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-array-different-schemas.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/bank-account-array-different-schemas.yaml"));
+             final JsonTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -684,8 +511,8 @@ class TestJsonTreeRowRecordReader {
     void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws 
IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/bank-account-array-optional-balance.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/bank-account-array-optional-balance.yaml"));
+             final JsonTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList("id", 
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -726,8 +553,8 @@ class TestJsonTreeRowRecordReader {
         fields.add(new RecordField("from", fromType));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new 
FileInputStream("src/test/resources/json/json-with-unicode.json");
-             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+        try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/yaml-with-unicode.yaml"));
+             final JsonTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
 
@@ -752,8 +579,8 @@ class TestJsonTreeRowRecordReader {
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
         MalformedRecordException mre = 
assertThrows(MalformedRecordException.class, () -> {
-            try (final InputStream in = new 
FileInputStream("src/test/resources/json/single-bank-account-wrong-field-type.json");
-                 final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
+            try (final InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/single-bank-account-wrong-field-type.yaml"));
+                 final YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, 
timeFormat, timestampFormat)) {
 
                 reader.nextRecord().getValues();
             }
@@ -768,8 +595,7 @@ class TestJsonTreeRowRecordReader {
 
     @Test
     void testMergeOfSimilarRecords() throws Exception {
-        // GIVEN
-        String jsonPath = "src/test/resources/json/similar-records.json";
+        String yamlPath = "src/test/resources/yaml/similar-records.yaml";
 
         RecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
@@ -794,21 +620,18 @@ class TestJsonTreeRowRecordReader {
             }})
         );
 
-        // WHEN
-        // THEN
-        testReadRecords(jsonPath, expected);
+        testReadRecords(yamlPath, expected);
     }
 
     @Test
     void testChoiceOfEmbeddedSimilarRecords() throws Exception {
-        // GIVEN
-        String jsonPath = 
"src/test/resources/json/choice-of-embedded-similar-records.json";
+        String yamlPath = 
"src/test/resources/yaml/choice-of-embedded-similar-records.yaml";
 
-        SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
         ));
-        SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
@@ -820,42 +643,39 @@ class TestJsonTreeRowRecordReader {
         ));
 
         List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<String, Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
+                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<String, Object>() {{
                     put("integer", 1);
                     put("boolean", true);
                 }}));
             }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema2, new 
HashMap<String, Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
+                put("record", new MapRecord(expectedRecordSchema2, new 
HashMap<String, Object>() {{
                     put("integer", 2);
                     put("string", "stringValue2");
                 }}));
             }})
         );
 
-        // WHEN
-        // THEN
-        testReadRecords(jsonPath, expected);
+        testReadRecords(yamlPath, expected);
     }
 
     @Test
     void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
-        // GIVEN
-        String jsonPath = 
"src/test/resources/json/choice-of-embedded-arrays-and-single-records.json";
+        String yamlPath = 
"src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml";
 
-        SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Collections.singletonList(
+        final SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Collections.singletonList(
                 new RecordField("integer", RecordFieldType.INT.getDataType())
         ));
-        SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
         ));
-        SimpleRecordSchema expectedRecordSchema3 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema3 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
-        SimpleRecordSchema expectedRecordSchema4 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema4 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
@@ -869,66 +689,63 @@ class TestJsonTreeRowRecordReader {
         ));
 
         List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<String, Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
+                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<String, Object>() {{
                     put("integer", 1);
                 }}));
             }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
                 put("record", new Object[]{
-                    new MapRecord(expectedRecordSchema2, new HashMap<String, 
Object>() {{
-                        put("integer", 21);
-                        put("boolean", true);
-                    }}),
-                    new MapRecord(expectedRecordSchema2, new HashMap<String, 
Object>() {{
-                        put("integer", 22);
-                        put("boolean", false);
-                    }})
+                        new MapRecord(expectedRecordSchema2, new 
HashMap<String, Object>() {{
+                            put("integer", 21);
+                            put("boolean", true);
+                        }}),
+                        new MapRecord(expectedRecordSchema2, new 
HashMap<String, Object>() {{
+                            put("integer", 22);
+                            put("boolean", false);
+                        }})
                 });
             }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema3, new 
HashMap<String, Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
+                put("record", new MapRecord(expectedRecordSchema3, new 
HashMap<String, Object>() {{
                     put("integer", 3);
                     put("string", "stringValue3");
                 }}));
             }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
                 put("record", new Object[]{
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 41);
-                        put("string", "stringValue41");
-                    }}),
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 42);
-                        put("string", "stringValue42");
-                    }})
+                        new MapRecord(expectedRecordSchema4, new 
HashMap<String, Object>() {{
+                            put("integer", 41);
+                            put("string", "stringValue41");
+                        }}),
+                        new MapRecord(expectedRecordSchema4, new 
HashMap<String, Object>() {{
+                            put("integer", 42);
+                            put("string", "stringValue42");
+                        }})
                 });
             }})
         );
 
-        // WHEN
-        // THEN
-        testReadRecords(jsonPath, expected);
+        testReadRecords(yamlPath, expected);
     }
 
     @Test
     void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
-        // GIVEN
-        String jsonPath = 
"src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json";
+        String yamlPath = 
"src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml";
 
-        SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
         ));
-        SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
         ));
-        SimpleRecordSchema expectedRecordSchema3 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema3 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
-        SimpleRecordSchema expectedRecordSchema4 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema4 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType()),
             new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
@@ -943,57 +760,54 @@ class TestJsonTreeRowRecordReader {
         ));
 
         List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<String, Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
+                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<String, Object>() {{
                     put("integer", 1);
                     put("boolean", false);
                 }}));
             }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
                 put("record", new Object[]{
-                    new MapRecord(expectedRecordSchema2, new HashMap<String, 
Object>() {{
-                        put("integer", 21);
-                        put("boolean", true);
-                    }}),
-                    new MapRecord(expectedRecordSchema2, new HashMap<String, 
Object>() {{
-                        put("integer", 22);
-                        put("boolean", false);
-                    }})
+                        new MapRecord(expectedRecordSchema2, new 
HashMap<String, Object>() {{
+                            put("integer", 21);
+                            put("boolean", true);
+                        }}),
+                        new MapRecord(expectedRecordSchema2, new 
HashMap<String, Object>() {{
+                            put("integer", 22);
+                            put("boolean", false);
+                        }})
                 });
             }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema3, new 
HashMap<String, Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
+                put("record", new MapRecord(expectedRecordSchema3, new 
HashMap<String, Object>() {{
                     put("integer", 3);
                     put("string", "stringValue3");
                 }}));
             }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
                 put("record", new Object[]{
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 41);
-                        put("string", "stringValue41");
-                    }}),
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 42);
-                        put("string", "stringValue42");
-                    }}),
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 43);
-                        put("boolean", false);
-                    }})
+                        new MapRecord(expectedRecordSchema4, new 
HashMap<String, Object>() {{
+                            put("integer", 41);
+                            put("string", "stringValue41");
+                        }}),
+                        new MapRecord(expectedRecordSchema4, new 
HashMap<String, Object>() {{
+                            put("integer", 42);
+                            put("string", "stringValue42");
+                        }}),
+                        new MapRecord(expectedRecordSchema4, new 
HashMap<String, Object>() {{
+                            put("integer", 43);
+                            put("boolean", false);
+                        }})
                 });
             }})
         );
 
-        // WHEN
-        // THEN
-        testReadRecords(jsonPath, expected);
+        testReadRecords(yamlPath, expected);
     }
 
     @Test
     void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
-        // GIVEN
-        String jsonPath = 
"src/test/resources/json/choice-of-different-arrays-with-extra-fields.json";
+        String yamlPath = 
"src/test/resources/yaml/choice-of-different-arrays-with-extra-fields.yaml";
 
         SimpleRecordSchema recordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
@@ -1035,47 +849,43 @@ class TestJsonTreeRowRecordReader {
         // Since the actual arrays have records with either (INT, BOOLEAN, 
STRING) or (INT, STRING, STRING)
         //  while the explicit schema defines only (INT, BOOLEAN) and (INT, 
STRING) we can't tell which record schema to chose
         //  so we take the first one (INT, BOOLEAN) - as best effort - for 
both cases
-        SimpleRecordSchema expectedSelectedRecordSchemaForRecordsInBothArrays 
= expectedChildSchema1;
-
         List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
                 put("record", new Object[]{
-                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new 
HashMap<String, Object>() {{
-                        put("integer", 11);
-                        put("boolean", true);
-                        put("extraString", "extraStringValue11");
-                    }}),
-                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new 
HashMap<String, Object>() {{
-                        put("integer", 12);
-                        put("boolean", false);
-                        put("extraString", "extraStringValue12");
-                    }})
+                        new MapRecord(expectedChildSchema1, new 
HashMap<String, Object>() {{
+                            put("integer", 11);
+                            put("boolean", true);
+                            put("extraString", "extraStringValue11");
+                        }}),
+                        new MapRecord(expectedChildSchema1, new 
HashMap<String, Object>() {{
+                            put("integer", 12);
+                            put("boolean", false);
+                            put("extraString", "extraStringValue12");
+                        }})
                 });
             }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
                 put("record", new Object[]{
-                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new 
HashMap<String, Object>() {{
-                        put("integer", 21);
-                        put("extraString", "extraStringValue21");
-                        put("string", "stringValue21");
-                    }}),
-                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new 
HashMap<String, Object>() {{
-                        put("integer", 22);
-                        put("extraString", "extraStringValue22");
-                        put("string", "stringValue22");
-                    }})
+                        new MapRecord(expectedChildSchema1, new 
HashMap<String, Object>() {{
+                            put("integer", 21);
+                            put("extraString", "extraStringValue21");
+                            put("string", "stringValue21");
+                        }}),
+                        new MapRecord(expectedChildSchema1, new 
HashMap<String, Object>() {{
+                            put("integer", 22);
+                            put("extraString", "extraStringValue22");
+                            put("string", "stringValue22");
+                        }})
                 });
             }})
         );
 
-        // WHEN
-        // THEN
-        testReadRecords(jsonPath, schema, expected);
+        testReadRecords(yamlPath, schema, expected);
     }
 
     @Test
     void testStartFromNestedArray() throws IOException, 
MalformedRecordException {
-        String jsonPath = 
"src/test/resources/json/single-element-nested-array.json";
+        String yamlPath = 
"src/test/resources/yaml/single-element-nested-array.yaml";
 
         SimpleRecordSchema expectedRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
                 new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1083,22 +893,22 @@ class TestJsonTreeRowRecordReader {
         ));
 
         List<Object> expected = Arrays.asList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>(){{
+                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
                     put("id", 42);
                     put("balance", 4750.89);
                 }}),
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>(){{
+                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
                     put("id", 43);
                     put("balance", 48212.38);
                 }})
         );
 
-        testReadRecords(jsonPath, expected, 
StartingFieldStrategy.NESTED_FIELD, "accounts");
+        testNestedReadRecords(yamlPath, expected, "accounts");
     }
 
     @Test
     void testStartFromNestedObject() throws IOException, 
MalformedRecordException {
-        String jsonPath = "src/test/resources/json/single-element-nested.json";
+        String yamlPath = "src/test/resources/yaml/single-element-nested.yaml";
 
         SimpleRecordSchema expectedRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
                 new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1112,12 +922,12 @@ class TestJsonTreeRowRecordReader {
                 }})
         );
 
-        testReadRecords(jsonPath, expected, 
StartingFieldStrategy.NESTED_FIELD, "account");
+        testNestedReadRecords(yamlPath, expected, "account");
     }
 
     @Test
     void testStartFromMultipleNestedField() throws IOException, 
MalformedRecordException {
-        String jsonPath = "src/test/resources/json/multiple-nested-field.json";
+        String yamlPath = "src/test/resources/yaml/multiple-nested-field.yaml";
 
         SimpleRecordSchema expectedRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
                 new RecordField("id", RecordFieldType.STRING.getDataType()),
@@ -1125,40 +935,39 @@ class TestJsonTreeRowRecordReader {
         ));
 
         List<Object> expected = Arrays.asList(
-                    new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>(){{
+                    new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
                         put("id", "n312kj3");
                         put("type", "employee");
                     }}),
-                    new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>(){{
+                    new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
                         put("id", "dl2kdff");
                         put("type", "security");
                     }})
         );
 
-        testReadRecords(jsonPath, expected, 
StartingFieldStrategy.NESTED_FIELD, "accountIds");
+        testNestedReadRecords(yamlPath, expected, "accountIds");
     }
 
     @Test
     void testStartFromSimpleFieldReturnsEmptyJson() throws IOException, 
MalformedRecordException {
-        String jsonPath = "src/test/resources/json/single-element-nested.json";
+        String yamlPath = "src/test/resources/yaml/single-element-nested.yaml";
 
-        testReadRecords(jsonPath, Collections.emptyList(), 
StartingFieldStrategy.NESTED_FIELD, "name");
+        testNestedReadRecords(yamlPath, Collections.emptyList(), "name");
     }
 
     @Test
     void testStartFromNonExistentFieldWithDefinedSchema() throws IOException, 
MalformedRecordException {
-        String jsonPath = "src/test/resources/json/single-element-nested.json";
+        String yamlPath = "src/test/resources/yaml/single-element-nested.yaml";
 
         SimpleRecordSchema expectedRecordSchema = new 
SimpleRecordSchema(getDefaultFields());
         List<Object> expected = Collections.emptyList();
 
-        testReadRecords(jsonPath, expectedRecordSchema, expected, 
StartingFieldStrategy.NESTED_FIELD,
-                "notfound", SchemaApplicationStrategy.SELECTED_PART);
+        testNestedReadRecords(yamlPath, expectedRecordSchema, expected, 
"notfound", SchemaApplicationStrategy.SELECTED_PART);
     }
 
     @Test
     void testStartFromNestedFieldThenStartObject() throws IOException, 
MalformedRecordException {
-        String jsonPath = 
"src/test/resources/json/nested-array-then-start-object.json";
+        String yamlPath = 
"src/test/resources/yaml/nested-array-then-start-object.yaml";
 
         SimpleRecordSchema expectedRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
                 new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1166,23 +975,22 @@ class TestJsonTreeRowRecordReader {
         ));
 
         List<Object> expected = Arrays.asList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>(){{
+                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
                     put("id", 42);
                     put("balance", 4750.89);
                 }}),
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>(){{
+                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
                     put("id", 43);
                     put("balance", 48212.38);
                 }})
         );
 
-        testReadRecords(jsonPath, expectedRecordSchema, expected, 
StartingFieldStrategy.NESTED_FIELD,
-                "accounts", SchemaApplicationStrategy.SELECTED_PART);
+        testNestedReadRecords(yamlPath, expectedRecordSchema, expected, 
"accounts", SchemaApplicationStrategy.SELECTED_PART);
     }
 
     @Test
     void testStartFromNestedObjectWithWholeJsonSchemaScope() throws 
IOException, MalformedRecordException {
-        String jsonPath = "src/test/resources/json/single-element-nested.json";
+        String yamlPath = "src/test/resources/yaml/single-element-nested.yaml";
 
         RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
                 new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1193,22 +1001,19 @@ class TestJsonTreeRowRecordReader {
                 new RecordField("account", 
RecordFieldType.RECORD.getRecordDataType(accountSchema))
         ));
 
-        RecordSchema expectedRecordSchema = accountSchema;
-
         List<Object> expected = Collections.singletonList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
+                new MapRecord(accountSchema, new HashMap<String, Object>() {{
                     put("id", 42);
                     put("balance", 4750.89);
                 }})
         );
 
-        testReadRecords(jsonPath, recordSchema, expected, 
StartingFieldStrategy.NESTED_FIELD,
-                "account", SchemaApplicationStrategy.WHOLE_JSON);
+        testNestedReadRecords(yamlPath, recordSchema, expected, "account", 
SchemaApplicationStrategy.WHOLE_JSON);
     }
 
     @Test
     void testStartFromNestedArrayWithWholeJsonSchemaScope() throws 
IOException, MalformedRecordException {
-        String jsonPath = 
"src/test/resources/json/single-element-nested-array.json";
+        String yamlPath = 
"src/test/resources/yaml/single-element-nested-array.yaml";
 
         RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
                 new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1219,26 +1024,23 @@ class TestJsonTreeRowRecordReader {
                 new RecordField("accounts", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountSchema)))
         ));
 
-        RecordSchema expectedRecordSchema = accountSchema;
-
         List<Object> expected = Arrays.asList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
+                new MapRecord(accountSchema, new HashMap<String, Object>() {{
                     put("id", 42);
                     put("balance", 4750.89);
                 }}),
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
+                new MapRecord(accountSchema, new HashMap<String, Object>() {{
                     put("id", 43);
                     put("balance", 48212.38);
                 }})
         );
 
-        testReadRecords(jsonPath, recordSchema, expected, 
StartingFieldStrategy.NESTED_FIELD,
-                "accounts", SchemaApplicationStrategy.WHOLE_JSON);
+        testNestedReadRecords(yamlPath, recordSchema, expected, "accounts", 
SchemaApplicationStrategy.WHOLE_JSON);
     }
 
     @Test
     void testStartFromDeepNestedObject() throws IOException, 
MalformedRecordException {
-        String jsonPath = 
"src/test/resources/json/single-element-deep-nested.json";
+        String yamlPath = 
"src/test/resources/yaml/single-element-deep-nested.yaml";
 
         RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
                 new RecordField("rootInt", RecordFieldType.INT.getDataType()),
@@ -1269,13 +1071,12 @@ class TestJsonTreeRowRecordReader {
                 }})
         );
 
-        testReadRecords(jsonPath, recordSchema, expected, 
StartingFieldStrategy.NESTED_FIELD,
-                "nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
+        testNestedReadRecords(yamlPath, recordSchema, expected, 
"nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
     }
 
     @Test
     void testCaptureFields() throws IOException, MalformedRecordException {
-        Map<String, String> expectedCapturedFields = new HashMap<>();
+        Map<String, String> expectedCapturedFields = new HashMap<String, 
String>();
         expectedCapturedFields.put("id", "1");
         expectedCapturedFields.put("zipCode", "11111");
         expectedCapturedFields.put("country", "USA");
@@ -1307,22 +1108,27 @@ class TestJsonTreeRowRecordReader {
                 new RecordField("country", 
RecordFieldType.STRING.getDataType())
         ));
 
-        try (InputStream in = new 
FileInputStream("src/test/resources/json/capture-fields.json")) {
-            JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(
+        try (InputStream in = 
Files.newInputStream(Paths.get("src/test/resources/yaml/capture-fields.yaml"))) 
{
+            YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(
                     in, mock(ComponentLog.class), recordSchema,
                     dateFormat, timeFormat, timestampFormat,
                     StartingFieldStrategy.NESTED_FIELD, startingFieldName,
                     SchemaApplicationStrategy.SELECTED_PART, capturePredicate);
 
-            while (reader.nextRecord() != null);
+            int records = 0;
+            while (reader.nextRecord() != null) {
+                records++;
+            }
+            assertNotEquals(0, records);
+
             Map<String, String> capturedFields = reader.getCapturedFields();
 
             assertEquals(expectedCapturedFields, capturedFields);
         }
     }
 
-    private void testReadRecords(String jsonPath, List<Object> expected) 
throws IOException, MalformedRecordException {
-        final File jsonFile = new File(jsonPath);
+    private void testReadRecords(String yamlPath, List<Object> expected) 
throws IOException, MalformedRecordException {
+        final File jsonFile = new File(yamlPath);
         try (
             InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
         ) {
@@ -1331,42 +1137,37 @@ class TestJsonTreeRowRecordReader {
         }
     }
 
-    private void testReadRecords(String jsonPath,
-                                 List<Object> expected,
-                                 StartingFieldStrategy strategy,
-                                 String startingFieldName)
-            throws IOException, MalformedRecordException {
+    private void testNestedReadRecords(String yamlPath, List<Object> expected, 
String startingFieldName) throws IOException, MalformedRecordException {
 
-        final File jsonFile = new File(jsonPath);
+        final File jsonFile = new File(yamlPath);
         try (InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
-            RecordSchema schema = inferSchema(jsonStream, strategy, 
startingFieldName);
-            testReadRecords(jsonStream, schema, expected, strategy, 
startingFieldName, SchemaApplicationStrategy.SELECTED_PART);
+            RecordSchema schema = inferSchema(jsonStream, 
StartingFieldStrategy.NESTED_FIELD, startingFieldName);
+            testNestedReadRecords(jsonStream, schema, expected, 
startingFieldName, SchemaApplicationStrategy.SELECTED_PART);
         }
     }
 
-    private void testReadRecords(String jsonPath, RecordSchema schema, 
List<Object> expected) throws IOException, MalformedRecordException {
-        final File jsonFile = new File(jsonPath);
+    private void testReadRecords(String yamlPath, RecordSchema schema, 
List<Object> expected) throws IOException, MalformedRecordException {
+        final File jsonFile = new File(yamlPath);
         try (InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
             testReadRecords(jsonStream, schema, expected);
         }
     }
 
-    private void testReadRecords(String jsonPath,
+    private void testNestedReadRecords(String yamlPath,
                                  RecordSchema schema,
                                  List<Object> expected,
-                                 StartingFieldStrategy strategy,
                                  String startingFieldName,
                                  SchemaApplicationStrategy 
schemaApplicationStrategy
     ) throws IOException, MalformedRecordException {
 
-        final File jsonFile = new File(jsonPath);
-        try (InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
-            testReadRecords(jsonStream, schema, expected, strategy, 
startingFieldName, schemaApplicationStrategy);
+        final File yamlFile = new File(yamlPath);
+        try (InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(yamlFile))) {
+            testNestedReadRecords(jsonStream, schema, expected, 
startingFieldName, schemaApplicationStrategy);
         }
     }
 
-    private void testReadRecords(InputStream jsonStream, RecordSchema schema, 
List<Object> expected) throws IOException, MalformedRecordException {
-        try (JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+    private void testReadRecords(InputStream yamlStream, RecordSchema schema, 
List<Object> expected) throws IOException, MalformedRecordException {
+        try (YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(yamlStream, mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
             List<Object> actual = new ArrayList<>();
             Record record;
             while ((record = reader.nextRecord()) != null) {
@@ -1392,16 +1193,15 @@ class TestJsonTreeRowRecordReader {
         }
     }
 
-    private void testReadRecords(InputStream jsonStream,
+    private void testNestedReadRecords(InputStream yamlStream,
                                  RecordSchema schema,
                                  List<Object> expected,
-                                 StartingFieldStrategy strategy,
                                  String startingFieldName,
                                  SchemaApplicationStrategy 
schemaApplicationStrategy)
             throws IOException, MalformedRecordException {
 
-        try (JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat,
-                strategy, startingFieldName, schemaApplicationStrategy, null)) 
{
+        try (YamlTreeRowRecordReader reader = new 
YamlTreeRowRecordReader(yamlStream, mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat,
+                StartingFieldStrategy.NESTED_FIELD, startingFieldName, 
schemaApplicationStrategy, null)) {
             List<Object> actual = new ArrayList<>();
             Record record;
 
@@ -1429,7 +1229,7 @@ class TestJsonTreeRowRecordReader {
 
     private RecordSchema inferSchema(InputStream jsonStream, 
StartingFieldStrategy strategy, String startingFieldName) throws IOException {
         RecordSchema schema = new InferSchemaAccessStrategy<>(
-            (__, inputStream) -> new JsonRecordSource(inputStream, strategy, 
startingFieldName),
+            (__, inputStream) -> new YamlRecordSource(inputStream, strategy, 
startingFieldName),
             new JsonSchemaInference(new TimeValueInference(null, null, null)),
             mock(ComponentLog.class)
         ).getSchema(Collections.emptyMap(), jsonStream, null);
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-different-schemas.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-different-schemas.yaml
new file mode 100644
index 0000000000..168771b317
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-different-schemas.yaml
@@ -0,0 +1,24 @@
+- id: 1
+  name: John Doe
+  balance: 4750.89
+  address: 123 My Street
+  city: My City
+  state: MS
+  zipCode: "11111"
+  country: USA
+- id: 2
+  name: Jane Doe
+  balance: 4820.09
+  address: 321 Your Street
+  city: Your City
+  state: NY
+  zipCode: "33333"
+- id: 3
+  name: Jake Doe
+  balance: 4751.89
+  address: 124 My Street
+  address2: 'Apt. #12'
+  city: My City
+  state: MS
+  zipCode: "11111"
+  country: USA
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-optional-balance.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-optional-balance.yaml
new file mode 100644
index 0000000000..c7e404564a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-optional-balance.yaml
@@ -0,0 +1,23 @@
+- id: 1
+  name: John Doe
+  balance: 4750.89
+  address: 123 My Street
+  city: My City
+  state: MS
+  zipCode: "11111"
+  country: USA
+- id: 2
+  name: Jane Doe
+  balance: null
+  address: 321 Your Street
+  city: Your City
+  state: NY
+  zipCode: "33333"
+  country: USA
+- id: 3
+  name: Jimmy Doe
+  address: 321 Your Street
+  city: Your City
+  state: NY
+  zipCode: "33333"
+  country: USA
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array.yaml
new file mode 100644
index 0000000000..b36049387c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array.yaml
@@ -0,0 +1,16 @@
+- id: 1
+  name: John Doe
+  balance: 4750.89
+  address: 123 My Street
+  city: My City
+  state: MS
+  zipCode: "11111"
+  country: USA
+- id: 2
+  name: Jane Doe
+  balance: 4820.09
+  address: 321 Your Street
+  city: Your City
+  state: NY
+  zipCode: "33333"
+  country: USA
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-comments.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-comments.yaml
new file mode 100644
index 0000000000..c73597c549
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-comments.yaml
@@ -0,0 +1,20 @@
+# comment before objects
+- id: 1
+  name: John Doe
+  # comment in object
+  balance: 4750.89
+  address: 123 My Street
+  city: My City
+  state: MS
+  zipCode: "11111"
+  country: USA
+# comment between objects
+- id: 2
+  name: Jane Doe
+  balance: 4820.09
+  address: 321 Your Street
+  city: Your City
+  state: NY
+  zipCode: "33333"
+  country: USA
+# Comment after objects
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/capture-fields.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/capture-fields.yaml
new file mode 100644
index 0000000000..5ed3412d0c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/capture-fields.yaml
@@ -0,0 +1,15 @@
+id: 1
+accounts:
+  - id: 42
+    balance: 4750.89
+  - id: 43
+    balance: 48212.38
+name: John Doe
+address: 123 My Street
+city: My City
+job:
+  salary: 115431
+  position: acountant
+state: MS
+zipCode: "11111"
+country: USA
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-different-arrays-with-extra-fields.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-different-arrays-with-extra-fields.yaml
new file mode 100644
index 0000000000..880166a76c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-different-arrays-with-extra-fields.yaml
@@ -0,0 +1,15 @@
+dataCollection:
+  - record:
+      - integer: 11
+        boolean: true
+        extraString: extraStringValue11
+      - integer: 12
+        boolean: false
+        extraString: extraStringValue12
+  - record:
+      - integer: 21
+        string: stringValue21
+        extraString: extraStringValue21
+      - integer: 22
+        string: stringValue22
+        extraString: extraStringValue22
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml
new file mode 100644
index 0000000000..7eada6d9d1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml
@@ -0,0 +1,16 @@
+dataCollection:
+  - record:
+      integer: 1
+  - record:
+      - integer: 21
+        boolean: true
+      - integer: 22
+        boolean: false
+  - record:
+      integer: 3
+      string: stringValue3
+  - record:
+      - integer: 41
+        string: stringValue41
+      - integer: 42
+        string: stringValue42
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-similar-records.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-similar-records.yaml
new file mode 100644
index 0000000000..826e064b98
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-similar-records.yaml
@@ -0,0 +1,7 @@
+dataCollection:
+  - record:
+      integer: 1
+      boolean: true
+  - record:
+      integer: 2
+      string: stringValue2
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
new file mode 100644
index 0000000000..603d3ddbbb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
@@ -0,0 +1,19 @@
+dataCollection:
+  - record:
+      integer: 1
+      boolean: false
+  - record:
+      - integer: 21
+        boolean: true
+      - integer: 22
+        boolean: false
+  - record:
+      integer: 3
+      string: stringValue3
+  - record:
+      - integer: 41
+        string: stringValue41
+      - integer: 42
+        string: stringValue42
+      - integer: 43
+        boolean: false
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-string-or-array-record.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-string-or-array-record.yaml
new file mode 100644
index 0000000000..8274e9c158
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-string-or-array-record.yaml
@@ -0,0 +1,4 @@
+fields:
+  - type: string
+  - type:
+      - type: string
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/elements-for-record-choice.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/elements-for-record-choice.yaml
new file mode 100644
index 0000000000..615fefa854
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/elements-for-record-choice.yaml
@@ -0,0 +1,6 @@
+- id: "1234"
+  child:
+    id: "4321"
+- id: "1234"
+  child:
+    name: child
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/multiple-nested-field.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/multiple-nested-field.yaml
new file mode 100644
index 0000000000..558ece7588
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/multiple-nested-field.yaml
@@ -0,0 +1,13 @@
+- id: 1
+  name: John Doe
+  balance: 4750.89
+  address: 123 My Street
+  city: My City
+  state: MS
+  zipCode: "11111"
+  country: USA
+  accountIds:
+    - id: n312kj3
+      type: employee
+    - id: dl2kdff
+      type: security
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/nested-array-then-start-object.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/nested-array-then-start-object.yaml
new file mode 100644
index 0000000000..133bb10998
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/nested-array-then-start-object.yaml
@@ -0,0 +1,9 @@
+- id: 17
+  name: John
+  accounts:
+    - id: 42
+      balance: 4750.89
+    - id: 43
+      balance: 48212.38
+- id: 98
+  balance: 67829.12
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/similar-records.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/similar-records.yaml
new file mode 100644
index 0000000000..7478e59a4a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/similar-records.yaml
@@ -0,0 +1,7 @@
+dataCollection:
+  - integer: 1
+    boolean: true
+    booleanOrString: true
+  - integer: 2
+    string: stringValue2
+    booleanOrString: booleanOrStringValue2
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account-wrong-field-type.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account-wrong-field-type.yaml
new file mode 100644
index 0000000000..5088aeeced
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account-wrong-field-type.yaml
@@ -0,0 +1,10 @@
+id: 1
+name: John Doe
+address: 123 My Street
+city: My City
+state: MS
+zipCode: "11111"
+country: USA
+account:
+  id: 42
+  balance: true
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account.yaml
new file mode 100644
index 0000000000..8d3c6d1fb6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account.yaml
@@ -0,0 +1,8 @@
+id: 1
+name: John Doe
+balance: 4750.89
+address: 123 My Street
+city: My City
+state: MS
+zipCode: "11111"
+country: USA
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-deep-nested.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-deep-nested.yaml
new file mode 100644
index 0000000000..af0458706b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-deep-nested.yaml
@@ -0,0 +1,8 @@
+rootInt: 100
+rootString: root_string
+nestedLevel1Record:
+  nestedLevel1Int: 110
+  nestedLevel1String: root.level1:string
+  nestedLevel2Record:
+    nestedLevel2Int: 111
+    nestedLevel2String: root.level1.level2:string
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested-array.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested-array.yaml
new file mode 100644
index 0000000000..6da7cddedc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested-array.yaml
@@ -0,0 +1,12 @@
+id: 1
+name: John Doe
+address: 123 My Street
+city: My City
+state: MS
+zipCode: "11111"
+country: USA
+accounts:
+  - id: 42
+    balance: 4750.89
+  - id: 43
+    balance: 48212.38
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested.yaml
new file mode 100644
index 0000000000..c452c2f0e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested.yaml
@@ -0,0 +1,10 @@
+id: 1
+name: John Doe
+address: 123 My Street
+city: My City
+state: MS
+zipCode: "11111"
+country: USA
+account:
+  id: 42
+  balance: 4750.89
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/timestamp.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/timestamp.yaml
new file mode 100644
index 0000000000..aa94607246
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/timestamp.yaml
@@ -0,0 +1,2 @@
+timestamp: 2019/06/27 13:04:04
+field_not_in_schema: some_value
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/yaml-with-unicode.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/yaml-with-unicode.yaml
new file mode 100644
index 0000000000..5dfe632940
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/yaml-with-unicode.yaml
@@ -0,0 +1,6 @@
+created_at: Thu Feb 16 01:19:42 +0000 2017
+id: 832036744985577473
+unicode: ちゃ泣きそう
+from:
+  id: 788946702264507903
+  name: john


Reply via email to