This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4b95129f96 NIFI-11197 Added YamlTreeReader
4b95129f96 is described below
commit 4b95129f962b0dfa2b1119f54d096c3a8464cfa5
Author: dan-s1 <[email protected]>
AuthorDate: Mon Oct 16 18:14:49 2023 +0000
NIFI-11197 Added YamlTreeReader
- Adjusted JsonTreeReader implementation for sharing common Jackson
components
This closes #7665
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/json/AbstractJsonRowRecordReader.java | 41 +-
.../org/apache/nifi/json/JsonParserFactory.java | 42 ++
.../apache/nifi/json/JsonPathRowRecordReader.java | 2 +-
.../org/apache/nifi/json/JsonRecordSource.java | 28 +-
.../apache/nifi/json/JsonTreeRowRecordReader.java | 18 +-
.../org/apache/nifi/json/TokenParserFactory.java | 36 ++
.../nifi-yaml-record-utils/pom.xml | 45 ++
.../org/apache/nifi/yaml/YamlParserFactory.java | 44 ++
.../org/apache/nifi/yaml/YamlRecordSource.java | 29 +
.../apache/nifi/yaml/YamlTreeRowRecordReader.java | 47 ++
.../nifi-extension-utils/nifi-record-utils/pom.xml | 1 +
.../nifi-record-serialization-services/pom.xml | 6 +
.../java/org/apache/nifi/json/JsonTreeReader.java | 51 +-
.../java/org/apache/nifi/yaml/YamlTreeReader.java | 81 +++
.../org.apache.nifi.controller.ControllerService | 1 +
.../additionalDetails.html | 2 +-
.../additionalDetails.html | 176 +++---
.../nifi/json/TestJsonTreeRowRecordReader.java | 3 +-
.../TestYamlTreeRowRecordReader.java} | 606 +++++++--------------
.../yaml/bank-account-array-different-schemas.yaml | 24 +
.../yaml/bank-account-array-optional-balance.yaml | 23 +
.../test/resources/yaml/bank-account-array.yaml | 16 +
.../test/resources/yaml/bank-account-comments.yaml | 20 +
.../src/test/resources/yaml/capture-fields.yaml | 15 +
...oice-of-different-arrays-with-extra-fields.yaml | 15 +
...oice-of-embedded-arrays-and-single-records.yaml | 16 +
.../yaml/choice-of-embedded-similar-records.yaml | 7 +
...-merged-embedded-arrays-and-single-records.yaml | 19 +
.../yaml/choice-of-string-or-array-record.yaml | 4 +
.../resources/yaml/elements-for-record-choice.yaml | 6 +
.../test/resources/yaml/multiple-nested-field.yaml | 13 +
.../yaml/nested-array-then-start-object.yaml | 9 +
.../src/test/resources/yaml/similar-records.yaml | 7 +
.../yaml/single-bank-account-wrong-field-type.yaml | 10 +
.../test/resources/yaml/single-bank-account.yaml | 8 +
.../resources/yaml/single-element-deep-nested.yaml | 8 +
.../yaml/single-element-nested-array.yaml | 12 +
.../test/resources/yaml/single-element-nested.yaml | 10 +
.../src/test/resources/yaml/timestamp.yaml | 2 +
.../src/test/resources/yaml/yaml-with-unicode.yaml | 6 +
40 files changed, 956 insertions(+), 553 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index d718e7e4bc..e3f6d8567e 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
@@ -56,7 +55,7 @@ import java.util.function.Supplier;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
public static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";
- static final PropertyDescriptor MAX_STRING_LENGTH = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor MAX_STRING_LENGTH = new
PropertyDescriptor.Builder()
.name("Max String Length")
.displayName("Max String Length")
.description("The maximum allowed length of a string value when
parsing the JSON document")
@@ -88,7 +87,6 @@ public abstract class AbstractJsonRowRecordReader implements
RecordReader {
private JsonParser jsonParser;
private JsonNode firstJsonNode;
private StartingFieldStrategy strategy;
-
private Map<String, String> capturedFields;
private BiPredicate<String, String> captureFieldPredicate;
@@ -104,28 +102,6 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
lazyTimestampFormat = () -> tsf;
}
- protected AbstractJsonRowRecordReader(final InputStream in,
- final ComponentLog logger,
- final String dateFormat,
- final String timeFormat,
- final String timestampFormat)
- throws IOException, MalformedRecordException {
-
- this(in, logger, dateFormat, timeFormat, timestampFormat, null, null,
null, false, null);
- }
-
- protected AbstractJsonRowRecordReader(final InputStream in,
- final ComponentLog logger,
- final String dateFormat,
- final String timeFormat,
- final String timestampFormat,
- final boolean allowComments,
- final StreamReadConstraints
streamReadConstraints)
- throws IOException, MalformedRecordException {
-
- this(in, logger, dateFormat, timeFormat, timestampFormat, null, null,
null, allowComments, streamReadConstraints);
- }
-
/**
* Constructor with initial logic for JSON to NiFi record parsing.
*
@@ -140,7 +116,7 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
* be accessed by calling {@link
#getCapturedFields()}
* @param allowComments whether to allow comments within the JSON
stream
* @param streamReadConstraints configuration for the JsonFactory stream
reader {@link StreamReadConstraints}
- *
+ * @param tokenParserFactory factory to provide an instance of
com.fasterxml.jackson.core.JsonParser
* @throws IOException in case of JSON stream processing
failure
* @throws MalformedRecordException in case of malformed JSON input
*/
@@ -153,7 +129,8 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
final String nestedFieldName,
final BiPredicate<String, String>
captureFieldPredicate,
final boolean allowComments,
- final StreamReadConstraints
streamReadConstraints)
+ final StreamReadConstraints
streamReadConstraints,
+ final TokenParserFactory
tokenParserFactory)
throws IOException, MalformedRecordException {
this(logger, dateFormat, timeFormat, timestampFormat);
@@ -163,14 +140,8 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
capturedFields = new LinkedHashMap<>();
try {
- final ObjectMapper codec = new ObjectMapper();
- if (allowComments) {
- codec.enable(JsonParser.Feature.ALLOW_COMMENTS);
- }
- codec.getFactory().setStreamReadConstraints(streamReadConstraints
!= null ? streamReadConstraints : DEFAULT_STREAM_READ_CONSTRAINTS);
-
- jsonParser = codec.getFactory().createParser(in);
- jsonParser.setCodec(codec);
+ final StreamReadConstraints configuredStreamReadConstraints =
streamReadConstraints == null ? DEFAULT_STREAM_READ_CONSTRAINTS :
streamReadConstraints;
+ jsonParser = tokenParserFactory.getJsonParser(in,
configuredStreamReadConstraints, allowComments);
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
while (jsonParser.nextToken() != null) {
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java
new file mode 100644
index 0000000000..d2e7484b3d
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+
+public class JsonParserFactory implements TokenParserFactory {
+ @Override
+ public JsonParser getJsonParser(final InputStream in, final
StreamReadConstraints streamReadConstraints, final boolean allowComments)
throws IOException {
+ Objects.requireNonNull(in, "Input Stream required");
+ Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints
required");
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
+ if (allowComments) {
+ objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
+ }
+ final JsonFactory jsonFactory = objectMapper.getFactory();
+ return jsonFactory.createParser(in);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index 328cf0c868..bc9ba69800 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -66,7 +66,7 @@ public class JsonPathRowRecordReader extends
AbstractJsonRowRecordReader {
final boolean allowComments, final
StreamReadConstraints streamReadConstraints)
throws MalformedRecordException, IOException {
- super(in, logger, dateFormat, timeFormat, timestampFormat,
allowComments, streamReadConstraints);
+ super(in, logger, dateFormat, timeFormat, timestampFormat, null, null,
null, allowComments, streamReadConstraints, new JsonParserFactory());
this.schema = schema;
this.jsonPaths = jsonPaths;
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java
index 348c2ef02f..5ff735fbdd 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java
@@ -16,12 +16,11 @@
*/
package org.apache.nifi.json;
-import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.schema.inference.RecordSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,29 +30,28 @@ import java.io.InputStream;
public class JsonRecordSource implements RecordSource<JsonNode> {
private static final Logger logger =
LoggerFactory.getLogger(JsonRecordSource.class);
- private static final JsonFactory jsonFactory;
+
+ private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS
= StreamReadConstraints.defaults();
+
+ private static final boolean ALLOW_COMMENTS_ENABLED = true;
+
private final JsonParser jsonParser;
private final StartingFieldStrategy strategy;
- private final String startingFieldName;
-
- static {
- jsonFactory = new JsonFactory();
- jsonFactory.setCodec(new ObjectMapper());
- }
public JsonRecordSource(final InputStream in) throws IOException {
- jsonParser = jsonFactory.createParser(in);
- strategy = null;
- startingFieldName = null;
+ this(in, null, null);
}
public JsonRecordSource(final InputStream in, final StartingFieldStrategy
strategy, final String startingFieldName) throws IOException {
- jsonParser = jsonFactory.createParser(in);
+ this(in , strategy, startingFieldName, new JsonParserFactory());
+ }
+
+ public JsonRecordSource(final InputStream in, final StartingFieldStrategy
strategy, final String startingFieldName, TokenParserFactory
tokenParserFactory) throws IOException {
+ jsonParser = tokenParserFactory.getJsonParser(in,
DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED);
this.strategy = strategy;
- this.startingFieldName = startingFieldName;
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
- final SerializedString serializedNestedField = new
SerializedString(this.startingFieldName);
+ final SerializedString serializedNestedField = new
SerializedString(startingFieldName);
while (!jsonParser.nextFieldName(serializedNestedField) &&
jsonParser.hasCurrentToken());
logger.debug("Parsing starting at nested field [{}]",
startingFieldName);
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 758cd96a48..489474e804 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -56,16 +56,7 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
final String dateFormat, final String
timeFormat, final String timestampFormat)
throws IOException, MalformedRecordException {
- this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
false, null);
- }
-
- public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
- final String dateFormat, final String
timeFormat, final String timestampFormat,
- final boolean allowComments, final
StreamReadConstraints streamReadConstraints)
- throws IOException, MalformedRecordException {
-
- this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
null, null, null, null,
- allowComments, streamReadConstraints);
+ this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
null, null, null, null);
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
@@ -75,18 +66,18 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
throws IOException, MalformedRecordException {
this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
- captureFieldPredicate, false, null);
+ captureFieldPredicate, false, null, new JsonParserFactory());
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
final String dateFormat, final String
timeFormat, final String timestampFormat,
final StartingFieldStrategy
startingFieldStrategy, final String startingFieldName,
final SchemaApplicationStrategy
schemaApplicationStrategy, final BiPredicate<String, String>
captureFieldPredicate,
- final boolean allowComments, final
StreamReadConstraints streamReadConstraints)
+ final boolean allowComments, final
StreamReadConstraints streamReadConstraints, final TokenParserFactory
tokenParserFactory)
throws IOException, MalformedRecordException {
super(in, logger, dateFormat, timeFormat, timestampFormat,
startingFieldStrategy, startingFieldName, captureFieldPredicate,
- allowComments, streamReadConstraints);
+ allowComments, streamReadConstraints, tokenParserFactory);
if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD &&
schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
this.schema = getSelectedSchema(schema, startingFieldName);
@@ -110,7 +101,6 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
}
}
}
-
}
throw new RuntimeException(String.format("Selected schema field [%s]
not found.", startingFieldName));
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java
new file mode 100644
index 0000000000..1c3a812fe2
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface TokenParserFactory {
+ /**
+ * Get JSON Parser implementation for provided Input Stream with
configured settings
+ *
+ * @param in Input Stream to be parsed
+ * @param streamReadConstraints Stream Read Constraints applied
+ * @param allowComments Whether to allow comments when parsing
+ * @return JSON Parser
+ * @throws IOException Thrown on failures to read the Input Stream
+ */
+ JsonParser getJsonParser(InputStream in, StreamReadConstraints
streamReadConstraints, boolean allowComments) throws IOException;
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/pom.xml
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/pom.xml
new file mode 100644
index 0000000000..3fd1f0d8b9
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-utils</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-yaml-record-utils</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-json-record-utils</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java
new file mode 100644
index 0000000000..a6df6c198a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.yaml;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+import org.apache.nifi.json.TokenParserFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class YamlParserFactory implements TokenParserFactory {
+ private static final YAMLFactory YAML_FACTORY = new YAMLFactory(new
YAMLMapper());
+
+ /**
+ * Get Parser implementation for YAML
+ *
+ * @param in Input Stream to be parsed
+ * @param streamReadConstraints Stream Read Constraints are not supported
in YAML
+ * @param allowComments Whether to allow comments when parsing does not
apply to YAML
+ * @return YAML Parser
+ * @throws IOException Thrown on parser creation failures
+ */
+ @Override
+ public JsonParser getJsonParser(final InputStream in, final
StreamReadConstraints streamReadConstraints, final boolean allowComments)
throws IOException {
+ return YAML_FACTORY.createParser(in);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java
new file mode 100644
index 0000000000..f47a591de6
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.yaml;
+
+import org.apache.nifi.json.JsonRecordSource;
+import org.apache.nifi.json.StartingFieldStrategy;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class YamlRecordSource extends JsonRecordSource {
+ public YamlRecordSource(final InputStream in, final StartingFieldStrategy
strategy, final String startingFieldName) throws IOException {
+ super(in, strategy, startingFieldName, new YamlParserFactory());
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java
new file mode 100644
index 0000000000..25136d85aa
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.yaml;
+
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.BiPredicate;
+
+public class YamlTreeRowRecordReader extends JsonTreeRowRecordReader {
+
+ public YamlTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
+ final String dateFormat, final String
timeFormat, final String timestampFormat) throws IOException,
MalformedRecordException {
+ this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
null, null, null, null);
+ }
+
+ public YamlTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
+ final String dateFormat, final String
timeFormat, final String timestampFormat,
+ final StartingFieldStrategy
startingFieldStrategy, final String startingFieldName,
+ final SchemaApplicationStrategy
schemaApplicationStrategy, final BiPredicate<String, String>
captureFieldPredicate)
+ throws IOException, MalformedRecordException {
+
+ super(in, logger, schema, dateFormat, timeFormat, timestampFormat,
startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
+ captureFieldPredicate, true, null, new YamlParserFactory());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
index 89820015c7..d87e21a9a3 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml
@@ -30,6 +30,7 @@
<module>nifi-json-record-utils</module>
<module>nifi-mock-record-utils</module>
<module>nifi-schema-inference-utils</module>
+ <module>nifi-yaml-record-utils</module>
</modules>
<dependencyManagement>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 8011be927d..8227f6f6e1 100755
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -68,6 +68,11 @@
<artifactId>nifi-json-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-yaml-record-utils</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
@@ -270,6 +275,7 @@
<exclude>src/test/resources/xml/testschema</exclude>
<exclude>src/test/resources/xml/testschema2</exclude>
<exclude>src/test/resources/xml/testschema3</exclude>
+ <exclude>src/test/resources/yaml/*.yaml</exclude>
</excludes>
</configuration>
</plugin>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index bcf7d66c95..596b3880bd 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -70,12 +70,12 @@ import static
org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
+ "See the Usage of the Controller Service for more information and
examples.")
@SeeAlso(JsonPathReader.class)
public class JsonTreeReader extends SchemaRegistryService implements
RecordReaderFactory {
- private volatile String dateFormat;
- private volatile String timeFormat;
- private volatile String timestampFormat;
- private volatile String startingFieldName;
- private volatile StartingFieldStrategy startingFieldStrategy;
- private volatile SchemaApplicationStrategy schemaApplicationStrategy;
+ protected volatile String dateFormat;
+ protected volatile String timeFormat;
+ protected volatile String timestampFormat;
+ protected volatile String startingFieldName;
+ protected volatile StartingFieldStrategy startingFieldStrategy;
+ protected volatile SchemaApplicationStrategy schemaApplicationStrategy;
private volatile boolean allowComments;
private volatile StreamReadConstraints streamReadConstraints;
@@ -138,9 +138,29 @@ public class JsonTreeReader extends SchemaRegistryService
implements RecordReade
this.startingFieldStrategy =
StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue());
this.startingFieldName =
context.getProperty(STARTING_FIELD_NAME).getValue();
this.schemaApplicationStrategy =
SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue());
+ this.streamReadConstraints = buildStreamReadConstraints(context);
+ this.allowComments = isAllowCommentsEnabled(context);
+ }
+
+ /**
+ * Build Stream Read Constraints based on available properties
+ *
+ * @param context Configuration Context with property values
+ * @return Stream Read Constraints
+ */
+ protected StreamReadConstraints buildStreamReadConstraints(final
ConfigurationContext context) {
final int maxStringLength =
context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
- this.streamReadConstraints =
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
- this.allowComments =
context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
+ return
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+ }
+
+ /**
+ * Determine whether to allow comments when parsing based on available
properties
+ *
+ * @param context Configuration Context with property values
+ * @return Allow comments status
+ */
+ protected boolean isAllowCommentsEnabled(final ConfigurationContext
context) {
+ return
context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
}
@Override
@@ -153,9 +173,7 @@ public class JsonTreeReader extends SchemaRegistryService
implements RecordReade
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String
schemaAccessStrategy, final SchemaRegistry schemaRegistry, final
PropertyContext context) {
- final RecordSourceFactory<JsonNode> jsonSourceFactory =
- (var, in) -> new JsonRecordSource(in, startingFieldStrategy,
startingFieldName);
-
+ final RecordSourceFactory<JsonNode> jsonSourceFactory =
createJsonRecordSourceFactory();
final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier =
() -> new JsonSchemaInference(new
TimeValueInference(dateFormat, timeFormat, timestampFormat));
@@ -163,16 +181,23 @@ public class JsonTreeReader extends SchemaRegistryService
implements RecordReade
() -> super.getSchemaAccessStrategy(schemaAccessStrategy,
schemaRegistry, context));
}
+ protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
+ return (variables, in) -> new JsonRecordSource(in,
startingFieldStrategy, startingFieldName);
+ }
+
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return INFER_SCHEMA;
}
- @Override
public RecordReader createRecordReader(final Map<String, String>
variables, final InputStream in, final long inputLength, final ComponentLog
logger)
throws IOException, MalformedRecordException,
SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
+ return createJsonTreeRowRecordReader(in, logger, schema);
+ }
+
+ protected JsonTreeRowRecordReader createJsonTreeRowRecordReader(final
InputStream in, final ComponentLog logger, final RecordSchema schema) throws
IOException, MalformedRecordException {
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat,
timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
- schemaApplicationStrategy, null, allowComments,
streamReadConstraints);
+ schemaApplicationStrategy, null, allowComments,
streamReadConstraints, new JsonParserFactory());
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java
new file mode 100644
index 0000000000..ab1a5682ed
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.yaml;
+
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.json.AbstractJsonRowRecordReader;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@Tags({"yaml", "tree", "record", "reader", "parser"})
+@CapabilityDescription("Parses YAML into individual Record objects. While the
reader expects each record "
+ + "to be well-formed YAML, the content of a FlowFile may consist of
many records, each as a well-formed "
+ + "YAML array or YAML object. "
+ + "If an array is encountered, each element in that array will be
treated as a separate record. "
+ + "If the schema that is configured contains a field that is not
present in the YAML, a null value will be used. If the YAML contains "
+ + "a field that is not present in the schema, that field will be
skipped. "
+ + "See the Usage of the Controller Service for more information and
examples.")
+public class YamlTreeReader extends JsonTreeReader {
+
+ private static final boolean ALLOW_COMMENTS_DISABLED = false;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
+ // Remove those properties which are not applicable for YAML
+ properties.remove(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
+ properties.remove(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
+
+ return properties;
+ }
+
+ @Override
+ protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
+ return (var, in) -> new YamlRecordSource(in, startingFieldStrategy,
startingFieldName);
+ }
+
+ @Override
+ protected JsonTreeRowRecordReader
createJsonTreeRowRecordReader(InputStream in, ComponentLog logger, RecordSchema
schema) throws IOException, MalformedRecordException {
+ return new YamlTreeRowRecordReader(in, logger, schema, dateFormat,
timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
+ schemaApplicationStrategy, null);
+ }
+
+ @Override
+ protected StreamReadConstraints buildStreamReadConstraints(final
ConfigurationContext context) {
+ return StreamReadConstraints.defaults();
+ }
+
+ @Override
+ protected boolean isAllowCommentsEnabled(final ConfigurationContext
context) {
+ return ALLOW_COMMENTS_DISABLED;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 7c391e2443..0b8484c3db 100755
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -38,3 +38,4 @@ org.apache.nifi.xml.XMLReader
org.apache.nifi.xml.XMLRecordSetWriter
org.apache.nifi.windowsevent.WindowsEventLogReader
org.apache.nifi.schema.inference.VolatileSchemaCache
+org.apache.nifi.yaml.YamlTreeReader
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
index f46b523758..d80f371913 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
@@ -411,7 +411,7 @@
<p>
When using JsonTreeReader with "Nested Field Strategy" and the
"Schema Access Strategy" is not "Infer Schema",
- it can be configured for the entire original JSON ("Whole JSON"
strategy) or for the nested field section ("Selected part" strategy).
+ it can be configured for the entire original JSON ("Whole
document" strategy) or for the nested field section ("Selected part" strategy).
</p>
</body>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.yaml.YamlTreeReader/additionalDetails.html
similarity index 83%
copy from
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
copy to
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.yaml.YamlTreeReader/additionalDetails.html
index f46b523758..ece3862e05 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.yaml.YamlTreeReader/additionalDetails.html
@@ -16,23 +16,23 @@
-->
<head>
<meta charset="utf-8"/>
- <title>JsonTreeReader</title>
+ <title>YamlTreeReader</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
</head>
<body>
<p>
- The JsonTreeReader Controller Service reads a JSON Object and
creates a Record object either for the
- entire JSON Object tree or a subpart (see "Starting Field
Strategies" section). The Controller Service
- must be configured with a Schema that describes the structure of
the JSON data. If any field exists in
- the JSON that is not in the schema, that field will be skipped. If
the schema contains a field for which
- no JSON field exists, a null value will be used in the Record (or
the default value defined in the schema,
+ The YamlTreeReader Controller Service reads a YAML Object and
creates a Record object either for the
+ entire YAML Object tree or a subpart (see "Starting Field
Strategies" section). The Controller Service
+ must be configured with a Schema that describes the structure of
the YAML data. If any field exists in
+ the YAML that is not in the schema, that field will be skipped. If
the schema contains a field for which
+ no YAML field exists, a null value will be used in the Record (or
the default value defined in the schema,
if applicable).
</p>
<p>
- If the root element of the JSON is a JSON Array, each JSON Object
within that array will be treated as
- its own separate Record. If the root element is a JSON Object, the
JSON will all be treated as a single
+ If the root element of the YAML is a YAML Array, each YAML Object
within that array will be treated as
+ its own separate Record. If the root element is a YAML Object, the
YAML will all be treated as a single
Record.
</p>
@@ -172,92 +172,128 @@
<h2>Starting Field Strategies</h2>
<p>
- When using JsonTreeReader, two different starting field strategies
can be selected. With the default Root Node strategy, the JsonTreeReader begins
processing from the root element
- of the JSON and creates a Record object for the entire JSON Object
tree, while the Nested Field strategy defines a nested field from which to
begin processing.
+ When using YamlTreeReader, two different starting field strategies
can be selected. With the default Root Node strategy, the YamlTreeReader begins
processing from the root element
+ of the YAML and creates a Record object for the entire YAML Object
tree, while the Nested Field strategy defines a nested field from which to
begin processing.
</p>
<p>
- Using the Nested Field strategy, a schema corresponding to the
nested JSON part should be specified. In case of schema inference, the
JsonTreeReader will automatically
+ Using the Nested Field strategy, a schema corresponding to the
nested YAML part should be specified. In case of schema inference, the
YamlTreeReader will automatically
infer a schema from nested records.
</p>
<h3>Root Node Strategy</h3>
<p>
- Consider the following JSON is read with the default Root Node
strategy:
+ Consider the following YAML is read with the default Root Node
strategy:
</p>
<code>
<pre>
-[
- {
- "id": 17,
- "name": "John",
- "child": {
- "id": "1"
- },
- "dob": "10-29-1982",
- "siblings": [
- {
- "name": "Jeremy",
- "id": 4
- },
- {
- "name": "Julia",
- "id": 8
- }
- ]
- },
- {
- "id": 98,
- "name": "Jane",
- "child": {
- "id": 2
- },
- "dob": "08-30-1984",
- "gender": "F",
- "siblingIds": [],
- "siblings": []
- }
-]
+- id: 17
+ name: John
+ child:
+ id: "1"
+ dob: 10-29-1982
+ siblings:
+ - name: Jeremy
+ id: 4
+ - name: Julia
+ id: 8
+- id: 98
+ name: Jane
+ child:
+ id: 2
+ dob: 08-30-1984
+ gender: F
+ siblingIds: []
+ siblings: []
</pre>
</code>
<p>
- Also, consider that the schema that is configured for this JSON is
as follows (assuming that the AvroSchemaRegistry
+ Also, consider that the schema that is configured for this YAML is
as follows (assuming that the AvroSchemaRegistry
Controller Service is chosen to denote the Schema):
</p>
<code>
<pre>
{
- "namespace": "nifi",
- "name": "person",
- "type": "record",
- "fields": [
- { "name": "id", "type": "int" },
- { "name": "name", "type": "string" },
- { "name": "gender", "type": "string" },
- { "name": "dob", "type": {
- "type": "int",
- "logicalType": "date"
- }},
- { "name": "siblings", "type": {
- "type": "array",
- "items": {
- "type": "record",
- "fields": [
- { "name": "name", "type": "string" }
- ]
+ "type": "record",
+ "name": "nifiRecord",
+ "namespace": "org.apache.nifi",
+ "fields": [
+ {
+ "name": "id",
+ "type": ["int","null"]
+ },
+ {
+ "name": "name",
+ "type": ["string","null"]
+ },
+ {
+ "name": "child",
+ "type": [
+ {
+ "type": "record",
+ "name": "childType",
+ "fields": [
+ {
+ "name": "id",
+ "type": ["int","string","null"]
}
- }}
- ]
+ ]
+ },
+ "null"
+ ]
+ },
+ {
+ "name": "dob",
+ "type": ["string","null"]
+ },
+ {
+ "name": "siblings",
+ "type": [
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "siblingsType",
+ "fields": [
+ {
+ "name": "name",
+ "type": ["string","null"]
+ },
+ {
+ "name": "id",
+ "type": ["int","null"]
+ }
+ ]
+ }
+ },
+ "null"
+ ]
+ },
+ {
+ "name": "gender",
+ "type": ["string","null"]
+ },
+ {
+ "name": "siblingIds",
+ "type": [
+ {
+ "type": "array",
+ "items": "string"
+ },
+ "null"
+ ]
+ }
+ ]
}
</pre>
</code>
<p>
Let us also assume that this Controller Service is configured with
the "Date Format" property set to "MM-dd-yyyy", as this
- matches the date format used for our JSON data. This will result
in the JSON creating two separate records, because the root
- element is a JSON array with two elements.
+ matches the date format used for our YAML data. This will result
in the YAML creating two separate records, because the root
+ element is a YAML array with two elements.
</p>
<p>
@@ -351,8 +387,8 @@
<h3>Nested Field Strategy</h3>
<p>
- Using the Nested Field strategy, consider the same JSON where the
specified Starting Field Name is
- "siblings". The schema that is configured for this JSON is as
follows:
+ Using the Nested Field strategy, consider the same YAML where the
specified Starting Field Name is
+ "siblings". The schema that is configured for this YAML is as
follows:
</p>
<code>
@@ -410,8 +446,8 @@
<h2>Schema Application Strategies</h2>
<p>
- When using JsonTreeReader with "Nested Field Strategy" and the
"Schema Access Strategy" is not "Infer Schema",
- it can be configured for the entire original JSON ("Whole JSON"
strategy) or for the nested field section ("Selected part" strategy).
+ When using YamlTreeReader with "Nested Field Strategy" and the
"Schema Access Strategy" is not "Infer Schema",
+ it can be configured for the entire original YAML ("Whole
document" strategy) or for the nested field section ("Selected part" strategy).
</p>
</body>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 7c7f715eff..5f9e01ce12 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -326,7 +326,8 @@ class TestJsonTreeRowRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream(inputFile);
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat, allowComments, streamReadConstraints)) {
+ final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat,
+ null, null, null, null, allowComments,
streamReadConstraints, new JsonParserFactory())) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
similarity index 68%
copy from
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
copy to
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
index 7c7f715eff..843d16b68f 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
@@ -15,13 +15,15 @@
* limitations under the License.
*/
-package org.apache.nifi.json;
+package org.apache.nifi.yaml;
-import com.fasterxml.jackson.core.StreamReadConstraints;
-import com.fasterxml.jackson.core.exc.StreamConstraintsException;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.json.JsonSchemaInference;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
import org.apache.nifi.schema.inference.TimeValueInference;
@@ -36,17 +38,16 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.util.EqualsWrapper;
import org.apache.nifi.util.MockComponentLog;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -54,13 +55,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -68,7 +69,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-class TestJsonTreeRowRecordReader {
+class TestYamlTreeRowRecordReader {
+ private static final Path BANK_ACCOUNT_ARRAY =
Paths.get("src/test/resources/yaml/bank-account-array.yaml");
private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
private final String timestampFormat =
RecordFieldType.TIMESTAMP.getDefaultFormat();
@@ -101,13 +103,13 @@ class TestJsonTreeRowRecordReader {
@Test
void testReadChoiceOfStringOrArrayOfRecords() throws IOException,
MalformedRecordException {
final File schemaFile = new
File("src/test/resources/json/choice-of-string-or-array-record.avsc");
- final File jsonFile = new
File("src/test/resources/json/choice-of-string-or-array-record.json");
+ final File jsonFile = new
File("src/test/resources/yaml/choice-of-string-or-array-record.yaml");
final Schema avroSchema = new Schema.Parser().parse(schemaFile);
final RecordSchema recordSchema =
AvroTypeUtil.createSchema(avroSchema);
- try (final InputStream fis = new FileInputStream(jsonFile);
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema,
dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream fis = Files.newInputStream(jsonFile.toPath());
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema,
dateFormat, timeFormat, timestampFormat)) {
final Record record = reader.nextRecord();
final Object[] fieldsArray = record.getAsArray("fields");
@@ -127,65 +129,6 @@ class TestJsonTreeRowRecordReader {
final Record firstTypeRecord = (Record) firstType;
assertEquals("string", firstTypeRecord.getAsString("type"));
}
-
- }
-
- @Test
- @Disabled("Intended only for manual testing to determine performance
before/after modifications")
- void testPerformanceOnLocalFile() throws IOException,
MalformedRecordException {
- final RecordSchema schema = new
SimpleRecordSchema(Collections.emptyList());
-
- final File file = new
File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289");
- final byte[] data = Files.readAllBytes(file.toPath());
-
- final ComponentLog logger = mock(ComponentLog.class);
-
- int recordCount = 0;
- final int iterations = 1000;
-
- for (int j = 0; j < 5; j++) {
- final long start = System.nanoTime();
- for (int i = 0; i < iterations; i++) {
- try (final InputStream in = new ByteArrayInputStream(data);
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat,
timestampFormat)) {
- while (reader.nextRecord() != null) {
- recordCount++;
- }
- }
- }
- final long nanos = System.nanoTime() - start;
- final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
- System.out.println("Took " + millis + " millis to read " +
recordCount + " records");
- }
- }
-
- @Test
- @Disabled("Intended only for manual testing to determine performance
before/after modifications")
- void testPerformanceOnIndividualMessages() throws IOException,
MalformedRecordException {
- final RecordSchema schema = new
SimpleRecordSchema(Collections.emptyList());
-
- final File file = new
File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json");
- final byte[] data = Files.readAllBytes(file.toPath());
-
- final ComponentLog logger = mock(ComponentLog.class);
-
- int recordCount = 0;
- final int iterations = 1_000_000;
-
- for (int j = 0; j < 5; j++) {
- final long start = System.nanoTime();
- for (int i = 0; i < iterations; i++) {
- try (final InputStream in = new ByteArrayInputStream(data);
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat,
timestampFormat)) {
- while (reader.nextRecord() != null) {
- recordCount++;
- }
- }
- }
- final long nanos = System.nanoTime() - start;
- final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
- System.out.println("Took " + millis + " millis to read " +
recordCount + " records");
- }
}
@Test
@@ -193,8 +136,8 @@ class TestJsonTreeRowRecordReader {
final Schema avroSchema = new Schema.Parser().parse(new
File("src/test/resources/json/record-choice.avsc"));
final RecordSchema recordSchema =
AvroTypeUtil.createSchema(avroSchema);
- try (final InputStream in = new
FileInputStream("src/test/resources/json/elements-for-record-choice.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/elements-for-record-choice.yaml"));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat,
timeFormat, timestampFormat)) {
// evaluate first record
final Record firstRecord = reader.nextRecord();
@@ -241,15 +184,14 @@ class TestJsonTreeRowRecordReader {
assertNull(reader.nextRecord());
}
-
}
@Test
void testReadArray() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-array.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -271,12 +213,12 @@ class TestJsonTreeRowRecordReader {
}
@Test
- void testReadOneLinePerJSON() throws IOException, MalformedRecordException
{
+ void testReadYamlComments() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-oneline.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
-
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/bank-account-comments.yaml")))
{
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat,
+ null, null, null, null);
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
@@ -296,121 +238,6 @@ class TestJsonTreeRowRecordReader {
}
}
- @Test
- void testReadMultilineJSON() throws IOException, MalformedRecordException {
-
testReadAccountJson("src/test/resources/json/bank-account-multiline.json",
false, null);
- }
-
- @Test
- void testReadJSONStringTooLong() {
- final StreamConstraintsException mre =
assertThrows(StreamConstraintsException.class, () ->
-
testReadAccountJson("src/test/resources/json/bank-account-multiline.json",
false, StreamReadConstraints.builder().maxStringLength(2).build()));
- assertTrue(mre.getMessage().contains("maximum length"));
- assertTrue(mre.getMessage().contains("(2)"));
- }
-
- @Test
- void testReadJSONComments() throws IOException, MalformedRecordException {
-
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc",
true, StreamReadConstraints.builder().maxStringLength(20_000).build());
- }
-
- @Test
- void testReadJSONDisallowComments() {
- final MalformedRecordException mre =
assertThrows(MalformedRecordException.class, () ->
-
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc",
false, StreamReadConstraints.builder().maxStringLength(20_000).build()));
- assertTrue(mre.getMessage().contains("not parse"));
- }
-
- private void testReadAccountJson(final String inputFile, final boolean
allowComments, final StreamReadConstraints streamReadConstraints) throws
IOException, MalformedRecordException {
- final List<RecordField> fields =
getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
- final RecordSchema schema = new SimpleRecordSchema(fields);
-
- try (final InputStream in = new FileInputStream(inputFile);
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat, allowComments, streamReadConstraints)) {
-
- final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
- assertEquals(expectedFieldNames, fieldNames);
-
- final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DECIMAL, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
- assertEquals(expectedTypes, dataTypes);
-
- final Object[] firstRecordValues = reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {1, "John Doe",
BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"},
firstRecordValues);
-
- final Object[] secondRecordValues =
reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {2, "Jane Doe",
BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333",
"USA"}, secondRecordValues);
-
- assertNull(reader.nextRecord());
- }
- }
-
- @Test
- void testReadMultilineArrays() throws IOException,
MalformedRecordException {
- final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
-
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-multiarray.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
-
- final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
- assertEquals(expectedFieldNames, fieldNames);
-
- final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
- assertEquals(expectedTypes, dataTypes);
-
- final Object[] firstRecordValues = reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
-
- final Object[] secondRecordValues =
reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
-
- final Object[] thirdRecordValues = reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My
Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
-
- final Object[] fourthRecordValues =
reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your
Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
-
- assertNull(reader.nextRecord());
- }
- }
-
- @Test
- void testReadMixedJSON() throws IOException, MalformedRecordException {
- final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
-
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-mixed.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
-
- final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
- assertEquals(expectedFieldNames, fieldNames);
-
- final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING,
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING,
RecordFieldType.STRING);
- assertEquals(expectedTypes, dataTypes);
-
- final Object[] firstRecordValues = reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My
Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
-
- final Object[] secondRecordValues =
reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your
Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
-
- final Object[] thirdRecordValues = reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My
Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
-
- final Object[] fourthRecordValues =
reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your
Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
-
-
- assertNull(reader.nextRecord());
- }
- }
@Test
void testReadRawRecordIncludesFieldsNotInSchema() throws IOException,
MalformedRecordException {
@@ -419,8 +246,8 @@ class TestJsonTreeRowRecordReader {
fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-array.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final Record schemaValidatedRecord = reader.nextRecord(true, true);
assertEquals(1, schemaValidatedRecord.getValue("id"));
@@ -428,8 +255,8 @@ class TestJsonTreeRowRecordReader {
assertNull(schemaValidatedRecord.getValue("balance"));
}
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-array.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(1, rawRecord.getValue("id"));
@@ -451,8 +278,8 @@ class TestJsonTreeRowRecordReader {
final String expectedMap = "{id=1, name=John Doe, address=123 My
Street, city=My City, state=MS, zipCode=11111, country=USA,
account=MapRecord[{id=42, balance=4750.89}]}";
final String expectedRecord = String.format("MapRecord[%s]",
expectedMap);
- try (final InputStream in = new
FileInputStream("src/test/resources/json/single-element-nested.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/single-element-nested.yaml"));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
@@ -470,8 +297,8 @@ class TestJsonTreeRowRecordReader {
fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-array.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final Record schemaValidatedRecord = reader.nextRecord(true, true);
assertEquals("1", schemaValidatedRecord.getValue("id")); // will
be coerced into a STRING as per the schema
@@ -481,8 +308,8 @@ class TestJsonTreeRowRecordReader {
assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
}
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-array.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in = Files.newInputStream(BANK_ACCOUNT_ARRAY);
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(1, rawRecord.getValue("id")); // will return raw
value of (int) 1
@@ -506,10 +333,10 @@ class TestJsonTreeRowRecordReader {
final String date = "2000-01-01";
final String datePattern = "yyyy-MM-dd";
- final String json = String.format("{ \"%s\": \"%s\" }", dateField,
date);
+ final String yaml = String.format("%s: %s", dateField, date);
for (final boolean coerceTypes : new boolean[] {true, false}) {
- try (final InputStream in = new
ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern,
timeFormat, timestampFormat)) {
+ try (final InputStream in = new
ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern,
timeFormat, timestampFormat)) {
final Record record = reader.nextRecord(coerceTypes, false);
final Object value = record.getValue(dateField);
@@ -525,8 +352,8 @@ class TestJsonTreeRowRecordReader {
final RecordSchema schema = new SimpleRecordSchema(recordFields);
for (final boolean coerceTypes : new boolean[] {true, false}) {
- try (final InputStream in = new
FileInputStream("src/test/resources/json/timestamp.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/timestamp.yaml"));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, "yyyy/MM/dd HH:mm:ss")) {
final Record record = reader.nextRecord(coerceTypes, false);
final Object value = record.getValue("timestamp");
@@ -539,8 +366,8 @@ class TestJsonTreeRowRecordReader {
void testSingleJsonElement() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new
FileInputStream("src/test/resources/json/single-bank-account.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/single-bank-account.yaml"));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -565,8 +392,8 @@ class TestJsonTreeRowRecordReader {
.map(f -> new RecordField(f.getFieldName(),
RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
final RecordSchema schema = new SimpleRecordSchema(choiceFields);
- try (final InputStream in = new
FileInputStream("src/test/resources/json/single-bank-account.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/single-bank-account.yaml"));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -596,8 +423,8 @@ class TestJsonTreeRowRecordReader {
fields.remove(new RecordField("balance",
RecordFieldType.DOUBLE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new
FileInputStream("src/test/resources/json/single-element-nested.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/single-element-nested.yaml"));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final List<RecordFieldType> dataTypes =
schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
final List<RecordFieldType> expectedTypes =
Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
@@ -628,8 +455,8 @@ class TestJsonTreeRowRecordReader {
fields.remove(new RecordField("balance",
RecordFieldType.DOUBLE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new
FileInputStream("src/test/resources/json/single-element-nested-array.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/single-element-nested-array.yaml"));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id",
"name", "address", "city", "state", "zipCode", "country", "accounts");
@@ -655,8 +482,8 @@ class TestJsonTreeRowRecordReader {
void testReadArrayDifferentSchemas() throws IOException,
MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-array-different-schemas.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/bank-account-array-different-schemas.yaml"));
+ final JsonTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -684,8 +511,8 @@ class TestJsonTreeRowRecordReader {
void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws
IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new
FileInputStream("src/test/resources/json/bank-account-array-optional-balance.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/bank-account-array-optional-balance.yaml"));
+ final JsonTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id",
"name", "balance", "address", "city", "state", "zipCode", "country");
@@ -726,8 +553,8 @@ class TestJsonTreeRowRecordReader {
fields.add(new RecordField("from", fromType));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new
FileInputStream("src/test/resources/json/json-with-unicode.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/yaml-with-unicode.yaml"));
+ final JsonTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -752,8 +579,8 @@ class TestJsonTreeRowRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
MalformedRecordException mre =
assertThrows(MalformedRecordException.class, () -> {
- try (final InputStream in = new
FileInputStream("src/test/resources/json/single-bank-account-wrong-field-type.json");
- final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
+ try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/single-bank-account-wrong-field-type.yaml"));
+ final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
reader.nextRecord().getValues();
}
@@ -768,8 +595,7 @@ class TestJsonTreeRowRecordReader {
@Test
void testMergeOfSimilarRecords() throws Exception {
- // GIVEN
- String jsonPath = "src/test/resources/json/similar-records.json";
+ String yamlPath = "src/test/resources/yaml/similar-records.yaml";
RecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
@@ -782,33 +608,30 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedSchema, new HashMap<String, Object>(){{
+ new MapRecord(expectedSchema, new HashMap<>(){{
put("integer", 1);
put("boolean", true);
put("booleanOrString", true);
}}),
- new MapRecord(expectedSchema, new HashMap<String, Object>(){{
+ new MapRecord(expectedSchema, new HashMap<>(){{
put("integer", 2);
put("string", "stringValue2");
put("booleanOrString", "booleanOrStringValue2");
}})
);
- // WHEN
- // THEN
- testReadRecords(jsonPath, expected);
+ testReadRecords(yamlPath, expected);
}
@Test
void testChoiceOfEmbeddedSimilarRecords() throws Exception {
- // GIVEN
- String jsonPath =
"src/test/resources/json/choice-of-embedded-similar-records.json";
+ String yamlPath =
"src/test/resources/yaml/choice-of-embedded-similar-records.yaml";
- SimpleRecordSchema expectedRecordSchema1 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema1 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
));
- SimpleRecordSchema expectedRecordSchema2 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema2 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
@@ -820,42 +643,39 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
- put("record", new MapRecord(expectedRecordSchema1, new
HashMap<String, Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
+ put("record", new MapRecord(expectedRecordSchema1, new
HashMap<>() {{
put("integer", 1);
put("boolean", true);
}}));
}}),
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
- put("record", new MapRecord(expectedRecordSchema2, new
HashMap<String, Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
+ put("record", new MapRecord(expectedRecordSchema2, new
HashMap<>() {{
put("integer", 2);
put("string", "stringValue2");
}}));
}})
);
- // WHEN
- // THEN
- testReadRecords(jsonPath, expected);
+ testReadRecords(yamlPath, expected);
}
@Test
void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
- // GIVEN
- String jsonPath =
"src/test/resources/json/choice-of-embedded-arrays-and-single-records.json";
+ String yamlPath =
"src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml";
- SimpleRecordSchema expectedRecordSchema1 = new
SimpleRecordSchema(Collections.singletonList(
+ final SimpleRecordSchema expectedRecordSchema1 = new
SimpleRecordSchema(Collections.singletonList(
new RecordField("integer", RecordFieldType.INT.getDataType())
));
- SimpleRecordSchema expectedRecordSchema2 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema2 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
));
- SimpleRecordSchema expectedRecordSchema3 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema3 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
- SimpleRecordSchema expectedRecordSchema4 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema4 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
@@ -869,66 +689,63 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
- put("record", new MapRecord(expectedRecordSchema1, new
HashMap<String, Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
+ put("record", new MapRecord(expectedRecordSchema1, new
HashMap<>() {{
put("integer", 1);
}}));
}}),
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
put("record", new Object[]{
- new MapRecord(expectedRecordSchema2, new HashMap<String,
Object>() {{
- put("integer", 21);
- put("boolean", true);
- }}),
- new MapRecord(expectedRecordSchema2, new HashMap<String,
Object>() {{
- put("integer", 22);
- put("boolean", false);
- }})
+ new MapRecord(expectedRecordSchema2, new HashMap<>() {{
+ put("integer", 21);
+ put("boolean", true);
+ }}),
+ new MapRecord(expectedRecordSchema2, new HashMap<>() {{
+ put("integer", 22);
+ put("boolean", false);
+ }})
});
}}),
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
- put("record", new MapRecord(expectedRecordSchema3, new
HashMap<String, Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
+ put("record", new MapRecord(expectedRecordSchema3, new
HashMap<>() {{
put("integer", 3);
put("string", "stringValue3");
}}));
}}),
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
put("record", new Object[]{
- new MapRecord(expectedRecordSchema4, new HashMap<String,
Object>() {{
- put("integer", 41);
- put("string", "stringValue41");
- }}),
- new MapRecord(expectedRecordSchema4, new HashMap<String,
Object>() {{
- put("integer", 42);
- put("string", "stringValue42");
- }})
+ new MapRecord(expectedRecordSchema4, new HashMap<>() {{
+ put("integer", 41);
+ put("string", "stringValue41");
+ }}),
+ new MapRecord(expectedRecordSchema4, new HashMap<>() {{
+ put("integer", 42);
+ put("string", "stringValue42");
+ }})
});
}})
);
- // WHEN
- // THEN
- testReadRecords(jsonPath, expected);
+ testReadRecords(yamlPath, expected);
}
@Test
void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
- // GIVEN
- String jsonPath =
"src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json";
+ String yamlPath =
"src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml";
- SimpleRecordSchema expectedRecordSchema1 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema1 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
));
- SimpleRecordSchema expectedRecordSchema2 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema2 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
));
- SimpleRecordSchema expectedRecordSchema3 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema3 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
- SimpleRecordSchema expectedRecordSchema4 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema expectedRecordSchema4 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType()),
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
@@ -943,57 +760,54 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
- put("record", new MapRecord(expectedRecordSchema1, new
HashMap<String, Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
+ put("record", new MapRecord(expectedRecordSchema1, new
HashMap<>() {{
put("integer", 1);
put("boolean", false);
}}));
}}),
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
put("record", new Object[]{
- new MapRecord(expectedRecordSchema2, new HashMap<String,
Object>() {{
- put("integer", 21);
- put("boolean", true);
- }}),
- new MapRecord(expectedRecordSchema2, new HashMap<String,
Object>() {{
- put("integer", 22);
- put("boolean", false);
- }})
+ new MapRecord(expectedRecordSchema2, new HashMap<>() {{
+ put("integer", 21);
+ put("boolean", true);
+ }}),
+ new MapRecord(expectedRecordSchema2, new HashMap<>() {{
+ put("integer", 22);
+ put("boolean", false);
+ }})
});
}}),
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
- put("record", new MapRecord(expectedRecordSchema3, new
HashMap<String, Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
+ put("record", new MapRecord(expectedRecordSchema3, new
HashMap<>() {{
put("integer", 3);
put("string", "stringValue3");
}}));
}}),
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
put("record", new Object[]{
- new MapRecord(expectedRecordSchema4, new HashMap<String,
Object>() {{
- put("integer", 41);
- put("string", "stringValue41");
- }}),
- new MapRecord(expectedRecordSchema4, new HashMap<String,
Object>() {{
- put("integer", 42);
- put("string", "stringValue42");
- }}),
- new MapRecord(expectedRecordSchema4, new HashMap<String,
Object>() {{
- put("integer", 43);
- put("boolean", false);
- }})
+ new MapRecord(expectedRecordSchema4, new HashMap<>() {{
+ put("integer", 41);
+ put("string", "stringValue41");
+ }}),
+ new MapRecord(expectedRecordSchema4, new HashMap<>() {{
+ put("integer", 42);
+ put("string", "stringValue42");
+ }}),
+ new MapRecord(expectedRecordSchema4, new HashMap<>() {{
+ put("integer", 43);
+ put("boolean", false);
+ }})
});
}})
);
- // WHEN
- // THEN
- testReadRecords(jsonPath, expected);
+ testReadRecords(yamlPath, expected);
}
@Test
void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
- // GIVEN
- String jsonPath =
"src/test/resources/json/choice-of-different-arrays-with-extra-fields.json";
+ String yamlPath =
"src/test/resources/yaml/choice-of-different-arrays-with-extra-fields.yaml";
SimpleRecordSchema recordSchema1 = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
@@ -1035,47 +849,43 @@ class TestJsonTreeRowRecordReader {
// Since the actual arrays have records with either (INT, BOOLEAN,
STRING) or (INT, STRING, STRING)
// while the explicit schema defines only (INT, BOOLEAN) and (INT,
STRING) we can't tell which record schema to chose
// so we take the first one (INT, BOOLEAN) - as best effort - for
both cases
- SimpleRecordSchema expectedSelectedRecordSchemaForRecordsInBothArrays
= expectedChildSchema1;
-
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
put("record", new Object[]{
- new
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new
HashMap<String, Object>() {{
- put("integer", 11);
- put("boolean", true);
- put("extraString", "extraStringValue11");
- }}),
- new
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new
HashMap<String, Object>() {{
- put("integer", 12);
- put("boolean", false);
- put("extraString", "extraStringValue12");
- }})
+ new MapRecord(expectedChildSchema1, new HashMap<>() {{
+ put("integer", 11);
+ put("boolean", true);
+ put("extraString", "extraStringValue11");
+ }}),
+ new MapRecord(expectedChildSchema1, new HashMap<>() {{
+ put("integer", 12);
+ put("boolean", false);
+ put("extraString", "extraStringValue12");
+ }})
});
}}),
- new MapRecord(expectedRecordChoiceSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
put("record", new Object[]{
- new
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new
HashMap<String, Object>() {{
- put("integer", 21);
- put("extraString", "extraStringValue21");
- put("string", "stringValue21");
- }}),
- new
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new
HashMap<String, Object>() {{
- put("integer", 22);
- put("extraString", "extraStringValue22");
- put("string", "stringValue22");
- }})
+ new MapRecord(expectedChildSchema1, new HashMap<>() {{
+ put("integer", 21);
+ put("extraString", "extraStringValue21");
+ put("string", "stringValue21");
+ }}),
+ new MapRecord(expectedChildSchema1, new HashMap<>() {{
+ put("integer", 22);
+ put("extraString", "extraStringValue22");
+ put("string", "stringValue22");
+ }})
});
}})
);
- // WHEN
- // THEN
- testReadRecords(jsonPath, schema, expected);
+ testReadRecords(yamlPath, schema, expected);
}
@Test
void testStartFromNestedArray() throws IOException,
MalformedRecordException {
- String jsonPath =
"src/test/resources/json/single-element-nested-array.json";
+ String yamlPath =
"src/test/resources/yaml/single-element-nested-array.yaml";
SimpleRecordSchema expectedRecordSchema = new
SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1083,22 +893,22 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("id", 42);
put("balance", 4750.89);
}}),
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("id", 43);
put("balance", 48212.38);
}})
);
- testReadRecords(jsonPath, expected,
StartingFieldStrategy.NESTED_FIELD, "accounts");
+ testNestedReadRecords(yamlPath, expected, "accounts");
}
@Test
void testStartFromNestedObject() throws IOException,
MalformedRecordException {
- String jsonPath = "src/test/resources/json/single-element-nested.json";
+ String yamlPath = "src/test/resources/yaml/single-element-nested.yaml";
SimpleRecordSchema expectedRecordSchema = new
SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1106,18 +916,18 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Collections.singletonList(
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
+ new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("id", 42);
put("balance", 4750.89);
}})
);
- testReadRecords(jsonPath, expected,
StartingFieldStrategy.NESTED_FIELD, "account");
+ testNestedReadRecords(yamlPath, expected, "account");
}
@Test
void testStartFromMultipleNestedField() throws IOException,
MalformedRecordException {
- String jsonPath = "src/test/resources/json/multiple-nested-field.json";
+ String yamlPath = "src/test/resources/yaml/multiple-nested-field.yaml";
SimpleRecordSchema expectedRecordSchema = new
SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.STRING.getDataType()),
@@ -1125,40 +935,39 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("id", "n312kj3");
put("type", "employee");
}}),
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("id", "dl2kdff");
put("type", "security");
}})
);
- testReadRecords(jsonPath, expected,
StartingFieldStrategy.NESTED_FIELD, "accountIds");
+ testNestedReadRecords(yamlPath, expected, "accountIds");
}
@Test
void testStartFromSimpleFieldReturnsEmptyJson() throws IOException,
MalformedRecordException {
- String jsonPath = "src/test/resources/json/single-element-nested.json";
+ String yamlPath = "src/test/resources/yaml/single-element-nested.yaml";
- testReadRecords(jsonPath, Collections.emptyList(),
StartingFieldStrategy.NESTED_FIELD, "name");
+ testNestedReadRecords(yamlPath, Collections.emptyList(), "name");
}
@Test
void testStartFromNonExistentFieldWithDefinedSchema() throws IOException,
MalformedRecordException {
- String jsonPath = "src/test/resources/json/single-element-nested.json";
+ String yamlPath = "src/test/resources/yaml/single-element-nested.yaml";
SimpleRecordSchema expectedRecordSchema = new
SimpleRecordSchema(getDefaultFields());
List<Object> expected = Collections.emptyList();
- testReadRecords(jsonPath, expectedRecordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
- "notfound", SchemaApplicationStrategy.SELECTED_PART);
+ testNestedReadRecords(yamlPath, expectedRecordSchema, expected,
"notfound", SchemaApplicationStrategy.SELECTED_PART);
}
@Test
void testStartFromNestedFieldThenStartObject() throws IOException,
MalformedRecordException {
- String jsonPath =
"src/test/resources/json/nested-array-then-start-object.json";
+ String yamlPath =
"src/test/resources/yaml/nested-array-then-start-object.yaml";
SimpleRecordSchema expectedRecordSchema = new
SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1166,23 +975,22 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("id", 42);
put("balance", 4750.89);
}}),
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>(){{
+ new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("id", 43);
put("balance", 48212.38);
}})
);
- testReadRecords(jsonPath, expectedRecordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
- "accounts", SchemaApplicationStrategy.SELECTED_PART);
+ testNestedReadRecords(yamlPath, expectedRecordSchema, expected,
"accounts", SchemaApplicationStrategy.SELECTED_PART);
}
@Test
void testStartFromNestedObjectWithWholeJsonSchemaScope() throws
IOException, MalformedRecordException {
- String jsonPath = "src/test/resources/json/single-element-nested.json";
+ String yamlPath = "src/test/resources/yaml/single-element-nested.yaml";
RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1193,22 +1001,19 @@ class TestJsonTreeRowRecordReader {
new RecordField("account",
RecordFieldType.RECORD.getRecordDataType(accountSchema))
));
- RecordSchema expectedRecordSchema = accountSchema;
-
List<Object> expected = Collections.singletonList(
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
+ new MapRecord(accountSchema, new HashMap<>() {{
put("id", 42);
put("balance", 4750.89);
}})
);
- testReadRecords(jsonPath, recordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
- "account", SchemaApplicationStrategy.WHOLE_JSON);
+ testNestedReadRecords(yamlPath, recordSchema, expected, "account",
SchemaApplicationStrategy.WHOLE_JSON);
}
@Test
void testStartFromNestedArrayWithWholeJsonSchemaScope() throws
IOException, MalformedRecordException {
- String jsonPath =
"src/test/resources/json/single-element-nested-array.json";
+ String yamlPath =
"src/test/resources/yaml/single-element-nested-array.yaml";
RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
@@ -1219,26 +1024,23 @@ class TestJsonTreeRowRecordReader {
new RecordField("accounts",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountSchema)))
));
- RecordSchema expectedRecordSchema = accountSchema;
-
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
+ new MapRecord(accountSchema, new HashMap<>() {{
put("id", 42);
put("balance", 4750.89);
}}),
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
+ new MapRecord(accountSchema, new HashMap<>() {{
put("id", 43);
put("balance", 48212.38);
}})
);
- testReadRecords(jsonPath, recordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
- "accounts", SchemaApplicationStrategy.WHOLE_JSON);
+ testNestedReadRecords(yamlPath, recordSchema, expected, "accounts",
SchemaApplicationStrategy.WHOLE_JSON);
}
@Test
void testStartFromDeepNestedObject() throws IOException,
MalformedRecordException {
- String jsonPath =
"src/test/resources/json/single-element-deep-nested.json";
+ String yamlPath =
"src/test/resources/yaml/single-element-deep-nested.yaml";
RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("rootInt", RecordFieldType.INT.getDataType()),
@@ -1263,14 +1065,13 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Collections.singletonList(
- new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
+ new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("nestedLevel2Int", 111);
put("nestedLevel2String", "root.level1.level2:string");
}})
);
- testReadRecords(jsonPath, recordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
- "nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
+ testNestedReadRecords(yamlPath, recordSchema, expected,
"nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
}
@Test
@@ -1307,22 +1108,27 @@ class TestJsonTreeRowRecordReader {
new RecordField("country",
RecordFieldType.STRING.getDataType())
));
- try (InputStream in = new
FileInputStream("src/test/resources/json/capture-fields.json")) {
- JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(
+ try (InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/capture-fields.yaml")))
{
+ YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(
in, mock(ComponentLog.class), recordSchema,
dateFormat, timeFormat, timestampFormat,
StartingFieldStrategy.NESTED_FIELD, startingFieldName,
SchemaApplicationStrategy.SELECTED_PART, capturePredicate);
- while (reader.nextRecord() != null);
+ int records = 0;
+ while (reader.nextRecord() != null) {
+ records++;
+ }
+ assertNotEquals(0, records);
+
Map<String, String> capturedFields = reader.getCapturedFields();
assertEquals(expectedCapturedFields, capturedFields);
}
}
- private void testReadRecords(String jsonPath, List<Object> expected)
throws IOException, MalformedRecordException {
- final File jsonFile = new File(jsonPath);
+ private void testReadRecords(String yamlPath, List<Object> expected)
throws IOException, MalformedRecordException {
+ final File jsonFile = new File(yamlPath);
try (
InputStream jsonStream = new
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
) {
@@ -1331,42 +1137,37 @@ class TestJsonTreeRowRecordReader {
}
}
- private void testReadRecords(String jsonPath,
- List<Object> expected,
- StartingFieldStrategy strategy,
- String startingFieldName)
- throws IOException, MalformedRecordException {
+ private void testNestedReadRecords(String yamlPath, List<Object> expected,
String startingFieldName) throws IOException, MalformedRecordException {
- final File jsonFile = new File(jsonPath);
+ final File jsonFile = new File(yamlPath);
try (InputStream jsonStream = new
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
- RecordSchema schema = inferSchema(jsonStream, strategy,
startingFieldName);
- testReadRecords(jsonStream, schema, expected, strategy,
startingFieldName, SchemaApplicationStrategy.SELECTED_PART);
+ RecordSchema schema = inferSchema(jsonStream,
StartingFieldStrategy.NESTED_FIELD, startingFieldName);
+ testNestedReadRecords(jsonStream, schema, expected,
startingFieldName, SchemaApplicationStrategy.SELECTED_PART);
}
}
- private void testReadRecords(String jsonPath, RecordSchema schema,
List<Object> expected) throws IOException, MalformedRecordException {
- final File jsonFile = new File(jsonPath);
+ private void testReadRecords(String yamlPath, RecordSchema schema,
List<Object> expected) throws IOException, MalformedRecordException {
+ final File jsonFile = new File(yamlPath);
try (InputStream jsonStream = new
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
testReadRecords(jsonStream, schema, expected);
}
}
- private void testReadRecords(String jsonPath,
+ private void testNestedReadRecords(String yamlPath,
RecordSchema schema,
List<Object> expected,
- StartingFieldStrategy strategy,
String startingFieldName,
SchemaApplicationStrategy
schemaApplicationStrategy
) throws IOException, MalformedRecordException {
- final File jsonFile = new File(jsonPath);
- try (InputStream jsonStream = new
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
- testReadRecords(jsonStream, schema, expected, strategy,
startingFieldName, schemaApplicationStrategy);
+ final File yamlFile = new File(yamlPath);
+ try (InputStream jsonStream = new
ByteArrayInputStream(FileUtils.readFileToByteArray(yamlFile))) {
+ testNestedReadRecords(jsonStream, schema, expected,
startingFieldName, schemaApplicationStrategy);
}
}
- private void testReadRecords(InputStream jsonStream, RecordSchema schema,
List<Object> expected) throws IOException, MalformedRecordException {
- try (JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema,
dateFormat, timeFormat, timestampFormat)) {
+ private void testReadRecords(InputStream yamlStream, RecordSchema schema,
List<Object> expected) throws IOException, MalformedRecordException {
+ try (YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(yamlStream, mock(ComponentLog.class), schema,
dateFormat, timeFormat, timestampFormat)) {
List<Object> actual = new ArrayList<>();
Record record;
while ((record = reader.nextRecord()) != null) {
@@ -1392,16 +1193,15 @@ class TestJsonTreeRowRecordReader {
}
}
- private void testReadRecords(InputStream jsonStream,
+ private void testNestedReadRecords(InputStream yamlStream,
RecordSchema schema,
List<Object> expected,
- StartingFieldStrategy strategy,
String startingFieldName,
SchemaApplicationStrategy
schemaApplicationStrategy)
throws IOException, MalformedRecordException {
- try (JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema,
dateFormat, timeFormat, timestampFormat,
- strategy, startingFieldName, schemaApplicationStrategy, null))
{
+ try (YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(yamlStream, mock(ComponentLog.class), schema,
dateFormat, timeFormat, timestampFormat,
+ StartingFieldStrategy.NESTED_FIELD, startingFieldName,
schemaApplicationStrategy, null)) {
List<Object> actual = new ArrayList<>();
Record record;
@@ -1429,7 +1229,7 @@ class TestJsonTreeRowRecordReader {
private RecordSchema inferSchema(InputStream jsonStream,
StartingFieldStrategy strategy, String startingFieldName) throws IOException {
RecordSchema schema = new InferSchemaAccessStrategy<>(
- (__, inputStream) -> new JsonRecordSource(inputStream, strategy,
startingFieldName),
+ (__, inputStream) -> new YamlRecordSource(inputStream, strategy,
startingFieldName),
new JsonSchemaInference(new TimeValueInference(null, null, null)),
mock(ComponentLog.class)
).getSchema(Collections.emptyMap(), jsonStream, null);
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-different-schemas.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-different-schemas.yaml
new file mode 100644
index 0000000000..168771b317
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-different-schemas.yaml
@@ -0,0 +1,24 @@
+- id: 1
+ name: John Doe
+ balance: 4750.89
+ address: 123 My Street
+ city: My City
+ state: MS
+ zipCode: "11111"
+ country: USA
+- id: 2
+ name: Jane Doe
+ balance: 4820.09
+ address: 321 Your Street
+ city: Your City
+ state: NY
+ zipCode: "33333"
+- id: 3
+ name: Jake Doe
+ balance: 4751.89
+ address: 124 My Street
+ address2: 'Apt. #12'
+ city: My City
+ state: MS
+ zipCode: "11111"
+ country: USA
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-optional-balance.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-optional-balance.yaml
new file mode 100644
index 0000000000..c7e404564a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array-optional-balance.yaml
@@ -0,0 +1,23 @@
+- id: 1
+ name: John Doe
+ balance: 4750.89
+ address: 123 My Street
+ city: My City
+ state: MS
+ zipCode: "11111"
+ country: USA
+- id: 2
+ name: Jane Doe
+ balance: null
+ address: 321 Your Street
+ city: Your City
+ state: NY
+ zipCode: "33333"
+ country: USA
+- id: 3
+ name: Jimmy Doe
+ address: 321 Your Street
+ city: Your City
+ state: NY
+ zipCode: "33333"
+ country: USA
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array.yaml
new file mode 100644
index 0000000000..b36049387c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-array.yaml
@@ -0,0 +1,16 @@
+- id: 1
+ name: John Doe
+ balance: 4750.89
+ address: 123 My Street
+ city: My City
+ state: MS
+ zipCode: "11111"
+ country: USA
+- id: 2
+ name: Jane Doe
+ balance: 4820.09
+ address: 321 Your Street
+ city: Your City
+ state: NY
+ zipCode: "33333"
+ country: USA
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-comments.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-comments.yaml
new file mode 100644
index 0000000000..c73597c549
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/bank-account-comments.yaml
@@ -0,0 +1,20 @@
+# comment before objects
+- id: 1
+ name: John Doe
+ # comment in object
+ balance: 4750.89
+ address: 123 My Street
+ city: My City
+ state: MS
+ zipCode: "11111"
+ country: USA
+# comment between objects
+- id: 2
+ name: Jane Doe
+ balance: 4820.09
+ address: 321 Your Street
+ city: Your City
+ state: NY
+ zipCode: "33333"
+ country: USA
+# Comment after objects
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/capture-fields.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/capture-fields.yaml
new file mode 100644
index 0000000000..5ed3412d0c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/capture-fields.yaml
@@ -0,0 +1,15 @@
+id: 1
+accounts:
+ - id: 42
+ balance: 4750.89
+ - id: 43
+ balance: 48212.38
+name: John Doe
+address: 123 My Street
+city: My City
+job:
+ salary: 115431
+ position: acountant
+state: MS
+zipCode: "11111"
+country: USA
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-different-arrays-with-extra-fields.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-different-arrays-with-extra-fields.yaml
new file mode 100644
index 0000000000..880166a76c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-different-arrays-with-extra-fields.yaml
@@ -0,0 +1,15 @@
+dataCollection:
+ - record:
+ - integer: 11
+ boolean: true
+ extraString: extraStringValue11
+ - integer: 12
+ boolean: false
+ extraString: extraStringValue12
+ - record:
+ - integer: 21
+ string: stringValue21
+ extraString: extraStringValue21
+ - integer: 22
+ string: stringValue22
+ extraString: extraStringValue22
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml
new file mode 100644
index 0000000000..7eada6d9d1
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml
@@ -0,0 +1,16 @@
+dataCollection:
+ - record:
+ integer: 1
+ - record:
+ - integer: 21
+ boolean: true
+ - integer: 22
+ boolean: false
+ - record:
+ integer: 3
+ string: stringValue3
+ - record:
+ - integer: 41
+ string: stringValue41
+ - integer: 42
+ string: stringValue42
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-similar-records.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-similar-records.yaml
new file mode 100644
index 0000000000..826e064b98
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-embedded-similar-records.yaml
@@ -0,0 +1,7 @@
+dataCollection:
+ - record:
+ integer: 1
+ boolean: true
+ - record:
+ integer: 2
+ string: stringValue2
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
new file mode 100644
index 0000000000..603d3ddbbb
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
@@ -0,0 +1,19 @@
+dataCollection:
+ - record:
+ integer: 1
+ boolean: false
+ - record:
+ - integer: 21
+ boolean: true
+ - integer: 22
+ boolean: false
+ - record:
+ integer: 3
+ string: stringValue3
+ - record:
+ - integer: 41
+ string: stringValue41
+ - integer: 42
+ string: stringValue42
+ - integer: 43
+ boolean: false
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-string-or-array-record.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-string-or-array-record.yaml
new file mode 100644
index 0000000000..8274e9c158
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-string-or-array-record.yaml
@@ -0,0 +1,4 @@
+fields:
+ - type: string
+ - type:
+ - type: string
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/elements-for-record-choice.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/elements-for-record-choice.yaml
new file mode 100644
index 0000000000..615fefa854
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/elements-for-record-choice.yaml
@@ -0,0 +1,6 @@
+- id: "1234"
+ child:
+ id: "4321"
+- id: "1234"
+ child:
+ name: child
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/multiple-nested-field.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/multiple-nested-field.yaml
new file mode 100644
index 0000000000..558ece7588
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/multiple-nested-field.yaml
@@ -0,0 +1,13 @@
+- id: 1
+ name: John Doe
+ balance: 4750.89
+ address: 123 My Street
+ city: My City
+ state: MS
+ zipCode: "11111"
+ country: USA
+ accountIds:
+ - id: n312kj3
+ type: employee
+ - id: dl2kdff
+ type: security
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/nested-array-then-start-object.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/nested-array-then-start-object.yaml
new file mode 100644
index 0000000000..133bb10998
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/nested-array-then-start-object.yaml
@@ -0,0 +1,9 @@
+- id: 17
+ name: John
+ accounts:
+ - id: 42
+ balance: 4750.89
+ - id: 43
+ balance: 48212.38
+- id: 98
+ balance: 67829.12
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/similar-records.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/similar-records.yaml
new file mode 100644
index 0000000000..7478e59a4a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/similar-records.yaml
@@ -0,0 +1,7 @@
+dataCollection:
+ - integer: 1
+ boolean: true
+ booleanOrString: true
+ - integer: 2
+ string: stringValue2
+ booleanOrString: booleanOrStringValue2
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account-wrong-field-type.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account-wrong-field-type.yaml
new file mode 100644
index 0000000000..5088aeeced
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account-wrong-field-type.yaml
@@ -0,0 +1,10 @@
+id: 1
+name: John Doe
+address: 123 My Street
+city: My City
+state: MS
+zipCode: "11111"
+country: USA
+account:
+ id: 42
+ balance: true
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account.yaml
new file mode 100644
index 0000000000..8d3c6d1fb6
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-bank-account.yaml
@@ -0,0 +1,8 @@
+id: 1
+name: John Doe
+balance: 4750.89
+address: 123 My Street
+city: My City
+state: MS
+zipCode: "11111"
+country: USA
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-deep-nested.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-deep-nested.yaml
new file mode 100644
index 0000000000..af0458706b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-deep-nested.yaml
@@ -0,0 +1,8 @@
+rootInt: 100
+rootString: root_string
+nestedLevel1Record:
+ nestedLevel1Int: 110
+ nestedLevel1String: root.level1:string
+ nestedLevel2Record:
+ nestedLevel2Int: 111
+ nestedLevel2String: root.level1.level2:string
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested-array.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested-array.yaml
new file mode 100644
index 0000000000..6da7cddedc
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested-array.yaml
@@ -0,0 +1,12 @@
+id: 1
+name: John Doe
+address: 123 My Street
+city: My City
+state: MS
+zipCode: "11111"
+country: USA
+accounts:
+ - id: 42
+ balance: 4750.89
+ - id: 43
+ balance: 48212.38
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested.yaml
new file mode 100644
index 0000000000..c452c2f0e4
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/single-element-nested.yaml
@@ -0,0 +1,10 @@
+id: 1
+name: John Doe
+address: 123 My Street
+city: My City
+state: MS
+zipCode: "11111"
+country: USA
+account:
+ id: 42
+ balance: 4750.89
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/timestamp.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/timestamp.yaml
new file mode 100644
index 0000000000..aa94607246
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/timestamp.yaml
@@ -0,0 +1,2 @@
+timestamp: 2019/06/27 13:04:04
+field_not_in_schema: some_value
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/yaml-with-unicode.yaml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/yaml-with-unicode.yaml
new file mode 100644
index 0000000000..5dfe632940
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/yaml-with-unicode.yaml
@@ -0,0 +1,6 @@
+created_at: Thu Feb 16 01:19:42 +0000 2017
+id: 832036744985577473
+unicode: ちゃ泣きそう
+from:
+ id: 788946702264507903
+ name: john