This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ece597b523 NIFI-8269 - Add support for schema inference in ForkRecord
processor when extracting array records
ece597b523 is described below
commit ece597b523ecdf3113077b6412115ba42ad67fa3
Author: Pierre Villard <[email protected]>
AuthorDate: Sun Aug 3 19:54:02 2025 +0200
NIFI-8269 - Add support for schema inference in ForkRecord processor when
extracting array records
* only compute writer schema from reader schema if writer is configured to
inherit the schema from the reader
---
.../nifi-standard-processors/pom.xml | 4 +
.../nifi/processors/standard/ForkRecord.java | 245 +++++++++++++++------
.../nifi/processors/standard/TestForkRecord.java | 90 ++++++++
.../input/complex-input-json-for-inference.json | 116 ++++++++++
.../output/extract-address-with-parents.json | 121 ++++++++++
.../output/extract-address-without-parents.json | 21 ++
.../output/extract-bank-accounts-with-parents.json | 108 +++++++++
7 files changed, 640 insertions(+), 65 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 98f308d340..d15f012dce 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -618,6 +618,10 @@
<exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude>
<exclude>src/test/resources/TestExtractRecordSchema/name_age_schema.avsc</exclude>
<exclude>src/test/resources/TestForkRecord/input/complex-input-json.json</exclude>
+
<exclude>src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json</exclude>
+
<exclude>src/test/resources/TestForkRecord/output/extract-address-without-parents.json</exclude>
+
<exclude>src/test/resources/TestForkRecord/output/extract-address-with-parents.json</exclude>
+
<exclude>src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json</exclude>
<exclude>src/test/resources/TestForkRecord/output/extract-transactions.json</exclude>
<exclude>src/test/resources/TestForkRecord/output/split-address.json</exclude>
<exclude>src/test/resources/TestForkRecord/output/split-transactions.json</exclude>
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
index 771979b8df..1646182e12 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
@@ -54,12 +54,17 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException;
import java.io.InputStream;
@@ -68,6 +73,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -244,103 +250,212 @@ public class ForkRecord extends AbstractProcessor {
public void process(final InputStream in) throws IOException {
try (final RecordReader reader =
readerFactory.createRecordReader(originalAttributes, in, original.getSize(),
getLogger())) {
- final RecordSchema writeSchema =
writerFactory.getSchema(originalAttributes, reader.getSchema());
- final OutputStream out = session.write(outFlowFile);
+ final Record firstRecord = reader.nextRecord();
- try (final RecordSetWriter recordSetWriter =
writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) {
+ final RecordSchema readerSchema = reader.getSchema();
+ final RecordSchema configuredWriterSchema =
writerFactory.getSchema(originalAttributes, readerSchema);
+
+ // we compute the write schema only if the writer is
configured to inherit the
+ // reader schema
+ final RecordSchema writeSchema;
+ if (configuredWriterSchema == readerSchema) {
+ final RecordSchema derivedSchema =
determineWriteSchema(firstRecord, readerSchema);
+ writeSchema =
writerFactory.getSchema(originalAttributes, derivedSchema);
+ } else {
+ writeSchema = configuredWriterSchema;
+ }
+
+ try (final OutputStream out =
session.write(outFlowFile);
+ final RecordSetWriter recordSetWriter =
writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) {
recordSetWriter.beginRecordSet();
- // we read each record of the input flow file
+ if (firstRecord != null) {
+ writeForkedRecords(firstRecord,
recordSetWriter, writeSchema);
+ readCount.incrementAndGet();
+ }
+
Record record;
while ((record = reader.nextRecord()) != null) {
-
readCount.incrementAndGet();
+ writeForkedRecords(record, recordSetWriter,
writeSchema);
+ }
- for (RecordPath recordPath : recordPaths) {
+ final WriteResult writeResult =
recordSetWriter.finishRecordSet();
- // evaluate record path in each record of
the flow file
- Iterator<FieldValue> it =
recordPath.evaluate(record).getSelectedFields().iterator();
+ try {
+ recordSetWriter.close();
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to close Writer for
{}", outFlowFile);
+ }
- while (it.hasNext()) {
- FieldValue fieldValue = it.next();
- RecordFieldType fieldType =
fieldValue.getField().getDataType().getFieldType();
+ final Map<String, String> attributes = new
HashMap<>();
+ writeCount.set(writeResult.getRecordCount());
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
recordSetWriter.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+
session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK);
+ }
- // we want to have an array here,
nothing else allowed
- if (fieldType !=
RecordFieldType.ARRAY) {
- getLogger().debug("The record path
{} is matching a field of type {} when the type ARRAY is expected.",
recordPath.getPath(), fieldType);
- continue;
+ } catch (final SchemaNotFoundException |
MalformedRecordException e) {
+ throw new ProcessException("Could not parse incoming
data: " + e.getLocalizedMessage(), e);
+ }
+ }
+
+ private RecordSchema determineWriteSchema(final Record
firstRecord, final RecordSchema readerSchema) throws SchemaNotFoundException,
IOException {
+ if (isSplitMode || firstRecord == null) {
+ return readerSchema;
+ }
+
+ final Map<String, RecordField> fieldMap = new
LinkedHashMap<>();
+
+ for (RecordPath recordPath : recordPaths) {
+ final Iterator<FieldValue> iterator =
recordPath.evaluate(firstRecord).getSelectedFields().iterator();
+ while (iterator.hasNext()) {
+ final FieldValue fieldValue = iterator.next();
+ Object fieldObject = fieldValue.getValue();
+ if (fieldObject instanceof List<?>) {
+ fieldObject = ((List<?>)
fieldObject).toArray();
+ }
+
+ DataType dataType =
fieldValue.getField().getDataType();
+ if (dataType.getFieldType() ==
RecordFieldType.CHOICE) {
+ DataType chosen = null;
+ if (fieldObject != null) {
+ chosen =
DataTypeUtils.chooseDataType(fieldObject, (ChoiceDataType) dataType);
+ }
+ if (chosen == null) {
+ for (final DataType possible :
((ChoiceDataType) dataType).getPossibleSubTypes()) {
+ if (((ArrayDataType)
possible).getElementType().getFieldType() == RecordFieldType.RECORD) {
+ chosen = possible;
+ break;
}
- if (fieldValue.getValue() == null) {
- getLogger().debug("The record path
{} is matching a field the value of which is null.", recordPath.getPath());
- continue;
+ if (chosen == null) {
+ chosen = possible;
}
+ }
+ }
+ if (chosen != null) {
+ dataType = chosen;
+ }
+ }
- if (isSplitMode) {
+ if (!(dataType instanceof ArrayDataType)) {
+ continue;
+ }
- Object[] items = (Object[])
fieldValue.getValue();
- for (Object item : items) {
- fieldValue.updateValue(new
Object[]{item});
- recordSetWriter.write(record);
- }
+ final ArrayDataType arrayDataType =
(ArrayDataType) dataType;
+ final DataType elementType =
arrayDataType.getElementType();
- } else {
+ if (elementType.getFieldType() !=
RecordFieldType.RECORD) {
+ continue;
+ }
- // we get the type of the elements
of the array
- final ArrayDataType arrayDataType
= (ArrayDataType) fieldValue.getField().getDataType();
- final DataType elementType =
arrayDataType.getElementType();
+ final RecordSchema elementSchema =
((RecordDataType) elementType).getChildSchema();
+ for (final RecordField elementField :
elementSchema.getFields()) {
+ fieldMap.put(elementField.getFieldName(),
elementField);
+ }
- // we want to have records in the
array
- if (elementType.getFieldType() !=
RecordFieldType.RECORD) {
- getLogger().debug("The record
path {} is matching an array field with values of type {} when the type RECORD
is expected.",
- recordPath.getPath(),
elementType.getFieldType());
- continue;
- }
+ if (addParentFields) {
+ addParentFieldSchemas(fieldMap, fieldValue);
+ }
+ }
+ }
- Object[] records = (Object[])
fieldValue.getValue();
- for (Object elementRecord :
records) {
+ final RecordSchema schema = new SimpleRecordSchema(new
ArrayList<>(fieldMap.values()));
+ return writerFactory.getSchema(originalAttributes, schema);
+ }
- if (elementRecord == null) {
- continue;
- }
+ private void addParentFieldSchemas(final Map<String,
RecordField> fieldMap, final FieldValue fieldValue) {
+ try {
+ final FieldValue parentField =
fieldValue.getParent().get();
+ final Record parentRecord =
fieldValue.getParentRecord().get();
- Record recordToWrite =
(Record) elementRecord;
+ for (final RecordField field :
parentRecord.getSchema().getFields()) {
+ if
(!field.getFieldName().equals(fieldValue.getField().getFieldName())) {
+ fieldMap.putIfAbsent(field.getFieldName(),
field);
+ }
+ }
- if (addParentFields) {
- // in this case we want to
recursively add the parent fields into the record to write
- // but we need to ensure
that the Record has the appropriate schema for that
-
recordToWrite.incorporateSchema(writeSchema);
-
recursivelyAddParentFields(recordToWrite, fieldValue);
- }
+ addParentFieldSchemas(fieldMap, parentField);
+ } catch (NoSuchElementException e) {
+ return; // No parent field, nothing to do
+ }
+ }
-
recordSetWriter.write(recordToWrite);
- }
+ private void writeForkedRecords(final Record record, final
RecordSetWriter recordSetWriter, final RecordSchema writeSchema) throws
IOException {
+ for (RecordPath recordPath : recordPaths) {
+ final Iterator<FieldValue> it =
recordPath.evaluate(record).getSelectedFields().iterator();
- }
+ while (it.hasNext()) {
+ final FieldValue fieldValue = it.next();
+ Object fieldObject = fieldValue.getValue();
+ if (fieldObject instanceof List<?>) {
+ fieldObject = ((List<?>)
fieldObject).toArray();
+ }
+ DataType dataType =
fieldValue.getField().getDataType();
+ if (dataType.getFieldType() ==
RecordFieldType.CHOICE) {
+ DataType chosen = null;
+ if (fieldObject != null) {
+ chosen =
DataTypeUtils.chooseDataType(fieldObject, (ChoiceDataType) dataType);
+ }
+ if (chosen == null) {
+ for (final DataType possible :
((ChoiceDataType) dataType).getPossibleSubTypes()) {
+ if (possible.getFieldType() ==
RecordFieldType.ARRAY) {
+ if (((ArrayDataType)
possible).getElementType().getFieldType() == RecordFieldType.RECORD) {
+ chosen = possible;
+ break;
+ }
+ if (chosen == null) {
+ chosen = possible;
+ }
+ }
}
-
+ }
+ if (chosen != null) {
+ dataType = chosen;
}
}
- final WriteResult writeResult =
recordSetWriter.finishRecordSet();
-
- try {
- recordSetWriter.close();
- } catch (final IOException ioe) {
- getLogger().warn("Failed to close Writer for
{}", outFlowFile);
+ if (!(dataType instanceof ArrayDataType) ||
fieldObject == null) {
+ getLogger().debug("The record path {} is
matching a field of type {} when the type ARRAY is expected.",
recordPath.getPath(), dataType.getFieldType());
+ continue;
}
- final Map<String, String> attributes = new
HashMap<>();
- writeCount.set(writeResult.getRecordCount());
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
recordSetWriter.getMimeType());
- attributes.putAll(writeResult.getAttributes());
-
session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK);
- }
+ if (isSplitMode) {
+ final Object[] items = (Object[]) fieldObject;
+ for (final Object item : items) {
+ fieldValue.updateValue(new Object[]{item});
+ recordSetWriter.write(record);
+ }
+ } else {
+ final ArrayDataType arrayDataType =
(ArrayDataType) dataType;
+ final DataType elementType =
arrayDataType.getElementType();
+
+ if (elementType.getFieldType() !=
RecordFieldType.RECORD) {
+ getLogger().debug("The record path {} is
matching an array field with values of type {} when the type RECORD is
expected.",
+ recordPath.getPath(),
elementType.getFieldType());
+ continue;
+ }
- } catch (final SchemaNotFoundException |
MalformedRecordException e) {
- throw new ProcessException("Could not parse incoming
data: " + e.getLocalizedMessage(), e);
+ final Object[] records = (Object[])
fieldObject;
+ for (final Object elementRecord : records) {
+ if (elementRecord == null) {
+ continue;
+ }
+
+ final Record recordToWrite = (Record)
elementRecord;
+
+ if (addParentFields) {
+
recordToWrite.incorporateSchema(writeSchema);
+
recursivelyAddParentFields(recordToWrite, fieldValue);
+ }
+
+ recordSetWriter.write(recordToWrite);
+ }
+ }
+ }
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
index 93594d0111..6801a36ff6 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
@@ -437,6 +437,96 @@ public class TestForkRecord {
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).getFirst().assertAttributeEquals("record.count",
"6");
}
+ @Test
+ public void testExtractWithParentFieldsAndInferredSchema() throws
Exception {
+ TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("record-reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("record-writer", jsonWriter);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.enableControllerService(jsonWriter);
+
+ runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+ runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+ runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+ runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+ runner.setProperty("bankAccounts", "/bankAccounts");
+
+
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+ runner.run();
+
+ runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+ final String expectedOutput = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json")));
+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
"5");
+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+ }
+
+ @Test
+ public void testExtractFieldsAndInferredSchema() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("record-reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("record-writer", jsonWriter);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.enableControllerService(jsonWriter);
+
+ runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+ runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+ runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+ runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "false");
+ runner.setProperty("address", "/address");
+
+
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+ runner.run();
+
+ runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+ final String expectedOutput = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-without-parents.json")));
+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
"5");
+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+ }
+
+ @Test
+ public void
testExtractFieldsWithParentsAndFieldConflictAndInferredSchema() throws
Exception {
+ TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("record-reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("record-writer", jsonWriter);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.enableControllerService(jsonWriter);
+
+ runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+ runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+ runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+ runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+ runner.setProperty("address", "/address");
+
+
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+ runner.run();
+
+ runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+ final String expectedOutput = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-with-parents.json")));
+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
"5");
+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+ }
+
private class JsonRecordReader extends AbstractControllerService
implements RecordReaderFactory {
private static final JsonParserFactory jsonParserFactory = new
JsonParserFactory();
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
new file mode 100644
index 0000000000..c7839c3125
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
@@ -0,0 +1,116 @@
+[
+ {
+ "id": 1,
+ "name": {
+ "last": "Doe",
+ "first": "John"
+ },
+ "address": [
+ {
+ "id": "home",
+ "street": "1 nifi street",
+ "city": "nifi city"
+ }
+ ],
+ "bankAccounts": [
+ {
+ "bankID": "OneBank",
+ "IBAN": "OneIBAN",
+ "last5Transactions": [
+ {
+ "comment": "food",
+ "amount": "-450"
+ }
+ ]
+ }
+ ]
+ }, {
+ "id": 2,
+ "name": {
+ "last": "Smith",
+ "first": "John"
+ },
+ "address": [null],
+ "bankAccounts": null
+ }, {
+ "id": 3,
+ "name": {
+ "last": "Smith",
+ "first": "Jane"
+ },
+ "address": [
+ {
+ "id": "home",
+ "street": "1 nifi street",
+ "city": "nifi city"
+ }, {
+ "id": "work",
+ "street": "1 nifi avenue",
+ "city": "apache city"
+ }
+ ],
+ "bankAccounts": [
+ {
+ "bankID": "nifi bank",
+ "IBAN": "myIBAN",
+ "last5Transactions": null
+ }, {
+ "bankID": "apache bank",
+ "IBAN": "myIBAN",
+ "last5Transactions": [
+ {
+ "comment": "gas station",
+ "amount": "-45"
+ }, {
+ "comment": "hair cut",
+ "amount": "-19"
+ }
+ ]
+ }
+ ]
+ }, {
+ "id": 4,
+ "name": {
+ "last": "Clark",
+ "first": "Jane"
+ },
+ "address": [
+ {
+ "id": "home",
+ "street": "10 nifi street",
+ "city": "nifi city"
+ }, {
+ "id": "work",
+ "street": "10 nifi avenue",
+ "city": "apache city"
+ }
+ ],
+ "bankAccounts": [
+ {
+ "bankID": "nifi bank",
+ "IBAN": "myIBAN",
+ "last5Transactions": [
+ {
+ "comment": "gift",
+ "amount": "+100"
+ }, {
+ "comment": "flights",
+ "amount": "-190"
+ }
+ ]
+ }, {
+ "bankID": "apache bank",
+ "IBAN": "myIBAN",
+ "last5Transactions": [
+ {
+ "comment": "nifi tshirt",
+ "amount": "0"
+ }, {
+ "comment": "theatre",
+ "amount": "-19"
+ }
+ ]
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json
new file mode 100644
index 0000000000..cea9a203ab
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json
@@ -0,0 +1,121 @@
+[ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city",
+ "name" : {
+ "last" : "Doe",
+ "first" : "John"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "OneBank",
+ "IBAN" : "OneIBAN",
+ "last5Transactions" : [ {
+ "comment" : "food",
+ "amount" : "-450"
+ } ]
+ } ]
+}, {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city",
+ "name" : {
+ "last" : "Smith",
+ "first" : "Jane"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : null
+ }, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gas station",
+ "amount" : "-45"
+ }, {
+ "comment" : "hair cut",
+ "amount" : "-19"
+ } ]
+ } ]
+}, {
+ "id" : "work",
+ "street" : "1 nifi avenue",
+ "city" : "apache city",
+ "name" : {
+ "last" : "Smith",
+ "first" : "Jane"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : null
+ }, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gas station",
+ "amount" : "-45"
+ }, {
+ "comment" : "hair cut",
+ "amount" : "-19"
+ } ]
+ } ]
+}, {
+ "id" : "home",
+ "street" : "10 nifi street",
+ "city" : "nifi city",
+ "name" : {
+ "last" : "Clark",
+ "first" : "Jane"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gift",
+ "amount" : "+100"
+ }, {
+ "comment" : "flights",
+ "amount" : "-190"
+ } ]
+ }, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "nifi tshirt",
+ "amount" : "0"
+ }, {
+ "comment" : "theatre",
+ "amount" : "-19"
+ } ]
+ } ]
+}, {
+ "id" : "work",
+ "street" : "10 nifi avenue",
+ "city" : "apache city",
+ "name" : {
+ "last" : "Clark",
+ "first" : "Jane"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gift",
+ "amount" : "+100"
+ }, {
+ "comment" : "flights",
+ "amount" : "-190"
+ } ]
+ }, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "nifi tshirt",
+ "amount" : "0"
+ }, {
+ "comment" : "theatre",
+ "amount" : "-19"
+ } ]
+ } ]
+} ]
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json
new file mode 100644
index 0000000000..34c1836cab
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json
@@ -0,0 +1,21 @@
+[ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+}, {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+}, {
+ "id" : "work",
+ "street" : "1 nifi avenue",
+ "city" : "apache city"
+}, {
+ "id" : "home",
+ "street" : "10 nifi street",
+ "city" : "nifi city"
+}, {
+ "id" : "work",
+ "street" : "10 nifi avenue",
+ "city" : "apache city"
+} ]
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
new file mode 100644
index 0000000000..6ebc4eeee0
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
@@ -0,0 +1,108 @@
+[ {
+ "bankID" : "OneBank",
+ "IBAN" : "OneIBAN",
+ "last5Transactions" : [ {
+ "comment" : "food",
+ "amount" : "-450"
+ } ],
+ "id" : 1,
+ "name" : {
+ "last" : "Doe",
+ "first" : "John"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+ } ]
+}, {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : null,
+ "id" : 3,
+ "name" : {
+ "last" : "Smith",
+ "first" : "Jane"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+ }, {
+ "id" : "work",
+ "street" : "1 nifi avenue",
+ "city" : "apache city"
+ } ]
+}, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gas station",
+ "amount" : "-45"
+ }, {
+ "comment" : "hair cut",
+ "amount" : "-19"
+ } ],
+ "id" : 3,
+ "name" : {
+ "last" : "Smith",
+ "first" : "Jane"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+ }, {
+ "id" : "work",
+ "street" : "1 nifi avenue",
+ "city" : "apache city"
+ } ]
+}, {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gift",
+ "amount" : "+100"
+ }, {
+ "comment" : "flights",
+ "amount" : "-190"
+ } ],
+ "id" : 4,
+ "name" : {
+ "last" : "Clark",
+ "first" : "Jane"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "10 nifi street",
+ "city" : "nifi city"
+ }, {
+ "id" : "work",
+ "street" : "10 nifi avenue",
+ "city" : "apache city"
+ } ]
+}, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "nifi tshirt",
+ "amount" : "0"
+ }, {
+ "comment" : "theatre",
+ "amount" : "-19"
+ } ],
+ "id" : 4,
+ "name" : {
+ "last" : "Clark",
+ "first" : "Jane"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "10 nifi street",
+ "city" : "nifi city"
+ }, {
+ "id" : "work",
+ "street" : "10 nifi avenue",
+ "city" : "apache city"
+ } ]
+} ]
\ No newline at end of file