This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ece597b523 NIFI-8269 - Add support for schema inference in ForkRecord 
processor when extracting array records
ece597b523 is described below

commit ece597b523ecdf3113077b6412115ba42ad67fa3
Author: Pierre Villard <[email protected]>
AuthorDate: Sun Aug 3 19:54:02 2025 +0200

    NIFI-8269 - Add support for schema inference in ForkRecord processor when 
extracting array records
    
    * only compute writer schema from reader schema if writer is configured to 
inherit the schema from the reader
---
 .../nifi-standard-processors/pom.xml               |   4 +
 .../nifi/processors/standard/ForkRecord.java       | 245 +++++++++++++++------
 .../nifi/processors/standard/TestForkRecord.java   |  90 ++++++++
 .../input/complex-input-json-for-inference.json    | 116 ++++++++++
 .../output/extract-address-with-parents.json       | 121 ++++++++++
 .../output/extract-address-without-parents.json    |  21 ++
 .../output/extract-bank-accounts-with-parents.json | 108 +++++++++
 7 files changed, 640 insertions(+), 65 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 98f308d340..d15f012dce 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -618,6 +618,10 @@
                         
<exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude>
                         
<exclude>src/test/resources/TestExtractRecordSchema/name_age_schema.avsc</exclude>
                         
<exclude>src/test/resources/TestForkRecord/input/complex-input-json.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/output/extract-address-without-parents.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/output/extract-address-with-parents.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json</exclude>
                         
<exclude>src/test/resources/TestForkRecord/output/extract-transactions.json</exclude>
                         
<exclude>src/test/resources/TestForkRecord/output/split-address.json</exclude>
                         
<exclude>src/test/resources/TestForkRecord/output/split-transactions.json</exclude>
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
index 771979b8df..1646182e12 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
@@ -54,12 +54,17 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -68,6 +73,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -244,103 +250,212 @@ public class ForkRecord extends AbstractProcessor {
                 public void process(final InputStream in) throws IOException {
                     try (final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, original.getSize(), 
getLogger())) {
 
-                        final RecordSchema writeSchema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
-                        final OutputStream out = session.write(outFlowFile);
+                        final Record firstRecord = reader.nextRecord();
 
-                        try (final RecordSetWriter recordSetWriter = 
writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) {
+                        final RecordSchema readerSchema = reader.getSchema();
+                        final RecordSchema configuredWriterSchema = 
writerFactory.getSchema(originalAttributes, readerSchema);
+
+                        // we compute the write schema only if the writer is 
configured to inherit the
+                        // reader schema
+                        final RecordSchema writeSchema;
+                        if (configuredWriterSchema == readerSchema) {
+                            final RecordSchema derivedSchema = 
determineWriteSchema(firstRecord, readerSchema);
+                            writeSchema = 
writerFactory.getSchema(originalAttributes, derivedSchema);
+                        } else {
+                            writeSchema = configuredWriterSchema;
+                        }
+
+                        try (final OutputStream out = 
session.write(outFlowFile);
+                                final RecordSetWriter recordSetWriter = 
writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) {
 
                             recordSetWriter.beginRecordSet();
 
-                            // we read each record of the input flow file
+                            if (firstRecord != null) {
+                                writeForkedRecords(firstRecord, 
recordSetWriter, writeSchema);
+                                readCount.incrementAndGet();
+                            }
+
                             Record record;
                             while ((record = reader.nextRecord()) != null) {
-
                                 readCount.incrementAndGet();
+                                writeForkedRecords(record, recordSetWriter, 
writeSchema);
+                            }
 
-                                for (RecordPath recordPath : recordPaths) {
+                            final WriteResult writeResult = 
recordSetWriter.finishRecordSet();
 
-                                    // evaluate record path in each record of 
the flow file
-                                    Iterator<FieldValue> it = 
recordPath.evaluate(record).getSelectedFields().iterator();
+                            try {
+                                recordSetWriter.close();
+                            } catch (final IOException ioe) {
+                                getLogger().warn("Failed to close Writer for 
{}", outFlowFile);
+                            }
 
-                                    while (it.hasNext()) {
-                                        FieldValue fieldValue = it.next();
-                                        RecordFieldType fieldType = 
fieldValue.getField().getDataType().getFieldType();
+                            final Map<String, String> attributes = new 
HashMap<>();
+                            writeCount.set(writeResult.getRecordCount());
+                            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                            attributes.put(CoreAttributes.MIME_TYPE.key(), 
recordSetWriter.getMimeType());
+                            attributes.putAll(writeResult.getAttributes());
+                            
session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK);
+                        }
 
-                                        // we want to have an array here, 
nothing else allowed
-                                        if (fieldType != 
RecordFieldType.ARRAY) {
-                                            getLogger().debug("The record path 
{} is matching a field of type {} when the type ARRAY is expected.", 
recordPath.getPath(), fieldType);
-                                            continue;
+                    } catch (final SchemaNotFoundException | 
MalformedRecordException e) {
+                        throw new ProcessException("Could not parse incoming 
data: " + e.getLocalizedMessage(), e);
+                    }
+                }
+
+                private RecordSchema determineWriteSchema(final Record 
firstRecord, final RecordSchema readerSchema) throws SchemaNotFoundException, 
IOException {
+                    if (isSplitMode || firstRecord == null) {
+                        return readerSchema;
+                    }
+
+                    final Map<String, RecordField> fieldMap = new 
LinkedHashMap<>();
+
+                    for (RecordPath recordPath : recordPaths) {
+                        final Iterator<FieldValue> iterator = 
recordPath.evaluate(firstRecord).getSelectedFields().iterator();
+                        while (iterator.hasNext()) {
+                            final FieldValue fieldValue = iterator.next();
+                            Object fieldObject = fieldValue.getValue();
+                            if (fieldObject instanceof List<?>) {
+                                fieldObject = ((List<?>) 
fieldObject).toArray();
+                            }
+
+                            DataType dataType = 
fieldValue.getField().getDataType();
+                            if (dataType.getFieldType() == 
RecordFieldType.CHOICE) {
+                                DataType chosen = null;
+                                if (fieldObject != null) {
+                                    chosen = 
DataTypeUtils.chooseDataType(fieldObject, (ChoiceDataType) dataType);
+                                }
+                                if (chosen == null) {
+                                    for (final DataType possible : 
((ChoiceDataType) dataType).getPossibleSubTypes()) {
+                                        if (((ArrayDataType) 
possible).getElementType().getFieldType() == RecordFieldType.RECORD) {
+                                            chosen = possible;
+                                            break;
                                         }
-                                        if (fieldValue.getValue() == null) {
-                                            getLogger().debug("The record path 
{} is matching a field the value of which is null.", recordPath.getPath());
-                                            continue;
+                                        if (chosen == null) {
+                                            chosen = possible;
                                         }
+                                    }
+                                }
+                                if (chosen != null) {
+                                    dataType = chosen;
+                                }
+                            }
 
-                                        if (isSplitMode) {
+                            if (!(dataType instanceof ArrayDataType)) {
+                                continue;
+                            }
 
-                                            Object[] items = (Object[]) 
fieldValue.getValue();
-                                            for (Object item : items) {
-                                                fieldValue.updateValue(new 
Object[]{item});
-                                                recordSetWriter.write(record);
-                                            }
+                            final ArrayDataType arrayDataType = 
(ArrayDataType) dataType;
+                            final DataType elementType = 
arrayDataType.getElementType();
 
-                                        } else {
+                            if (elementType.getFieldType() != 
RecordFieldType.RECORD) {
+                                continue;
+                            }
 
-                                            // we get the type of the elements 
of the array
-                                            final ArrayDataType arrayDataType 
= (ArrayDataType) fieldValue.getField().getDataType();
-                                            final DataType elementType = 
arrayDataType.getElementType();
+                            final RecordSchema elementSchema = 
((RecordDataType) elementType).getChildSchema();
+                            for (final RecordField elementField : 
elementSchema.getFields()) {
+                                fieldMap.put(elementField.getFieldName(), 
elementField);
+                            }
 
-                                            // we want to have records in the 
array
-                                            if (elementType.getFieldType() != 
RecordFieldType.RECORD) {
-                                                getLogger().debug("The record 
path {} is matching an array field with values of type {} when the type RECORD 
is expected.",
-                                                        recordPath.getPath(), 
elementType.getFieldType());
-                                                continue;
-                                            }
+                            if (addParentFields) {
+                                addParentFieldSchemas(fieldMap, fieldValue);
+                            }
+                        }
+                    }
 
-                                            Object[] records = (Object[]) 
fieldValue.getValue();
-                                            for (Object elementRecord : 
records) {
+                    final RecordSchema schema = new SimpleRecordSchema(new 
ArrayList<>(fieldMap.values()));
+                    return writerFactory.getSchema(originalAttributes, schema);
+                }
 
-                                                if (elementRecord == null) {
-                                                    continue;
-                                                }
+                private void addParentFieldSchemas(final Map<String, 
RecordField> fieldMap, final FieldValue fieldValue) {
+                    try {
+                        final FieldValue parentField = 
fieldValue.getParent().get();
+                        final Record parentRecord = 
fieldValue.getParentRecord().get();
 
-                                                Record recordToWrite = 
(Record) elementRecord;
+                        for (final RecordField field : 
parentRecord.getSchema().getFields()) {
+                            if 
(!field.getFieldName().equals(fieldValue.getField().getFieldName())) {
+                                fieldMap.putIfAbsent(field.getFieldName(), 
field);
+                            }
+                        }
 
-                                                if (addParentFields) {
-                                                    // in this case we want to 
recursively add the parent fields into the record to write
-                                                    // but we need to ensure 
that the Record has the appropriate schema for that
-                                                    
recordToWrite.incorporateSchema(writeSchema);
-                                                    
recursivelyAddParentFields(recordToWrite, fieldValue);
-                                                }
+                        addParentFieldSchemas(fieldMap, parentField);
+                    } catch (NoSuchElementException e) {
+                        return; // No parent field, nothing to do
+                    }
+                }
 
-                                                
recordSetWriter.write(recordToWrite);
-                                            }
+                private void writeForkedRecords(final Record record, final 
RecordSetWriter recordSetWriter, final RecordSchema writeSchema) throws 
IOException {
+                    for (RecordPath recordPath : recordPaths) {
+                        final Iterator<FieldValue> it = 
recordPath.evaluate(record).getSelectedFields().iterator();
 
-                                        }
+                        while (it.hasNext()) {
+                            final FieldValue fieldValue = it.next();
+                            Object fieldObject = fieldValue.getValue();
+                            if (fieldObject instanceof List<?>) {
+                                fieldObject = ((List<?>) 
fieldObject).toArray();
+                            }
 
+                            DataType dataType = 
fieldValue.getField().getDataType();
+                            if (dataType.getFieldType() == 
RecordFieldType.CHOICE) {
+                                DataType chosen = null;
+                                if (fieldObject != null) {
+                                    chosen = 
DataTypeUtils.chooseDataType(fieldObject, (ChoiceDataType) dataType);
+                                }
+                                if (chosen == null) {
+                                    for (final DataType possible : 
((ChoiceDataType) dataType).getPossibleSubTypes()) {
+                                        if (possible.getFieldType() == 
RecordFieldType.ARRAY) {
+                                            if (((ArrayDataType) 
possible).getElementType().getFieldType() == RecordFieldType.RECORD) {
+                                                chosen = possible;
+                                                break;
+                                            }
+                                            if (chosen == null) {
+                                                chosen = possible;
+                                            }
+                                        }
                                     }
-
+                                }
+                                if (chosen != null) {
+                                    dataType = chosen;
                                 }
                             }
 
-                            final WriteResult writeResult = 
recordSetWriter.finishRecordSet();
-
-                            try {
-                                recordSetWriter.close();
-                            } catch (final IOException ioe) {
-                                getLogger().warn("Failed to close Writer for 
{}", outFlowFile);
+                            if (!(dataType instanceof ArrayDataType) || 
fieldObject == null) {
+                                getLogger().debug("The record path {} is 
matching a field of type {} when the type ARRAY is expected.", 
recordPath.getPath(), dataType.getFieldType());
+                                continue;
                             }
 
-                            final Map<String, String> attributes = new 
HashMap<>();
-                            writeCount.set(writeResult.getRecordCount());
-                            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-                            attributes.put(CoreAttributes.MIME_TYPE.key(), 
recordSetWriter.getMimeType());
-                            attributes.putAll(writeResult.getAttributes());
-                            
session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK);
-                        }
+                            if (isSplitMode) {
+                                final Object[] items = (Object[]) fieldObject;
+                                for (final Object item : items) {
+                                    fieldValue.updateValue(new Object[]{item});
+                                    recordSetWriter.write(record);
+                                }
+                            } else {
+                                final ArrayDataType arrayDataType = 
(ArrayDataType) dataType;
+                                final DataType elementType = 
arrayDataType.getElementType();
+
+                                if (elementType.getFieldType() != 
RecordFieldType.RECORD) {
+                                    getLogger().debug("The record path {} is 
matching an array field with values of type {} when the type RECORD is 
expected.",
+                                            recordPath.getPath(), 
elementType.getFieldType());
+                                    continue;
+                                }
 
-                    } catch (final SchemaNotFoundException | 
MalformedRecordException e) {
-                        throw new ProcessException("Could not parse incoming 
data: " + e.getLocalizedMessage(), e);
+                                final Object[] records = (Object[]) 
fieldObject;
+                                for (final Object elementRecord : records) {
+                                    if (elementRecord == null) {
+                                        continue;
+                                    }
+
+                                    final Record recordToWrite = (Record) 
elementRecord;
+
+                                    if (addParentFields) {
+                                        
recordToWrite.incorporateSchema(writeSchema);
+                                        
recursivelyAddParentFields(recordToWrite, fieldValue);
+                                    }
+
+                                    recordSetWriter.write(recordToWrite);
+                                }
+                            }
+                        }
                     }
                 }
 
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
index 93594d0111..6801a36ff6 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
@@ -437,6 +437,96 @@ public class TestForkRecord {
         
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).getFirst().assertAttributeEquals("record.count",
 "6");
     }
 
+    @Test
+    public void testExtractWithParentFieldsAndInferredSchema() throws 
Exception {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("record-reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("record-writer", jsonWriter);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.enableControllerService(jsonWriter);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+        runner.setProperty("bankAccounts", "/bankAccounts");
+
+        
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+        runner.run();
+
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final String expectedOutput = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json")));
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
 "5");
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+    }
+
+    @Test
+    public void testExtractFieldsAndInferredSchema() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("record-reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("record-writer", jsonWriter);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.enableControllerService(jsonWriter);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "false");
+        runner.setProperty("address", "/address");
+
+        
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+        runner.run();
+
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final String expectedOutput = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-without-parents.json")));
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
 "5");
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+    }
+
+    @Test
+    public void 
testExtractFieldsWithParentsAndFieldConflictAndInferredSchema() throws 
Exception {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("record-reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("record-writer", jsonWriter);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.enableControllerService(jsonWriter);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+        runner.setProperty("address", "/address");
+
+        
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+        runner.run();
+
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final String expectedOutput = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-with-parents.json")));
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
 "5");
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+    }
+
     private class JsonRecordReader extends AbstractControllerService 
implements RecordReaderFactory {
 
         private static final JsonParserFactory jsonParserFactory = new 
JsonParserFactory();
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
new file mode 100644
index 0000000000..c7839c3125
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
@@ -0,0 +1,116 @@
+[
+  {
+    "id": 1,
+    "name": {
+      "last": "Doe",
+      "first": "John"
+    },
+    "address": [
+      {
+        "id": "home",
+        "street": "1 nifi street",
+        "city": "nifi city"
+      }
+    ],
+    "bankAccounts": [
+      {
+        "bankID": "OneBank",
+        "IBAN": "OneIBAN",
+        "last5Transactions": [
+          {
+            "comment": "food",
+            "amount": "-450"
+          }
+        ]
+      }
+    ]
+  }, {
+    "id": 2,
+    "name": {
+      "last": "Smith",
+      "first": "John"
+    },
+    "address": [null],
+    "bankAccounts": null
+  }, {
+    "id": 3,
+    "name": {
+      "last": "Smith",
+      "first": "Jane"
+    },
+    "address": [
+      {
+        "id": "home",
+        "street": "1 nifi street",
+        "city": "nifi city"
+      }, {
+        "id": "work",
+        "street": "1 nifi avenue",
+        "city": "apache city"
+      }
+    ],
+    "bankAccounts": [
+      {
+        "bankID": "nifi bank",
+        "IBAN": "myIBAN",
+        "last5Transactions": null
+      }, {
+        "bankID": "apache bank",
+        "IBAN": "myIBAN",
+        "last5Transactions": [
+          {
+            "comment": "gas station",
+            "amount": "-45"
+          }, {
+            "comment": "hair cut",
+            "amount": "-19"
+          }
+        ]
+      }
+    ]
+  }, {
+    "id": 4,
+    "name": {
+      "last": "Clark",
+      "first": "Jane"
+    },
+    "address": [
+      {
+        "id": "home",
+        "street": "10 nifi street",
+        "city": "nifi city"
+      }, {
+        "id": "work",
+        "street": "10 nifi avenue",
+        "city": "apache city"
+      }
+    ],
+    "bankAccounts": [
+      {
+        "bankID": "nifi bank",
+        "IBAN": "myIBAN",
+        "last5Transactions": [
+          {
+            "comment": "gift",
+            "amount": "+100"
+          }, {
+            "comment": "flights",
+            "amount": "-190"
+          }
+        ]
+      }, {
+        "bankID": "apache bank",
+        "IBAN": "myIBAN",
+        "last5Transactions": [
+          {
+            "comment": "nifi tshirt",
+            "amount": "0"
+          }, {
+            "comment": "theatre",
+            "amount": "-19"
+          }
+        ]
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json
new file mode 100644
index 0000000000..cea9a203ab
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json
@@ -0,0 +1,121 @@
+[ {
+  "id" : "home",
+  "street" : "1 nifi street",
+  "city" : "nifi city",
+  "name" : {
+    "last" : "Doe",
+    "first" : "John"
+  },
+  "bankAccounts" : [ {
+    "bankID" : "OneBank",
+    "IBAN" : "OneIBAN",
+    "last5Transactions" : [ {
+      "comment" : "food",
+      "amount" : "-450"
+    } ]
+  } ]
+}, {
+  "id" : "home",
+  "street" : "1 nifi street",
+  "city" : "nifi city",
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  },
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : null
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gas station",
+      "amount" : "-45"
+    }, {
+      "comment" : "hair cut",
+      "amount" : "-19"
+    } ]
+  } ]
+}, {
+  "id" : "work",
+  "street" : "1 nifi avenue",
+  "city" : "apache city",
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  },
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : null
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gas station",
+      "amount" : "-45"
+    }, {
+      "comment" : "hair cut",
+      "amount" : "-19"
+    } ]
+  } ]
+}, {
+  "id" : "home",
+  "street" : "10 nifi street",
+  "city" : "nifi city",
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gift",
+      "amount" : "+100"
+    }, {
+      "comment" : "flights",
+      "amount" : "-190"
+    } ]
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "nifi tshirt",
+      "amount" : "0"
+    }, {
+      "comment" : "theatre",
+      "amount" : "-19"
+    } ]
+  } ]
+}, {
+  "id" : "work",
+  "street" : "10 nifi avenue",
+  "city" : "apache city",
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gift",
+      "amount" : "+100"
+    }, {
+      "comment" : "flights",
+      "amount" : "-190"
+    } ]
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "nifi tshirt",
+      "amount" : "0"
+    }, {
+      "comment" : "theatre",
+      "amount" : "-19"
+    } ]
+  } ]
+} ]
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json
new file mode 100644
index 0000000000..34c1836cab
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json
@@ -0,0 +1,21 @@
+[ {
+  "id" : "home",
+  "street" : "1 nifi street",
+  "city" : "nifi city"
+}, {
+  "id" : "home",
+  "street" : "1 nifi street",
+  "city" : "nifi city"
+}, {
+  "id" : "work",
+  "street" : "1 nifi avenue",
+  "city" : "apache city"
+}, {
+  "id" : "home",
+  "street" : "10 nifi street",
+  "city" : "nifi city"
+}, {
+  "id" : "work",
+  "street" : "10 nifi avenue",
+  "city" : "apache city"
+} ]
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
new file mode 100644
index 0000000000..6ebc4eeee0
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
@@ -0,0 +1,108 @@
+[ {
+  "bankID" : "OneBank",
+  "IBAN" : "OneIBAN",
+  "last5Transactions" : [ {
+    "comment" : "food",
+    "amount" : "-450"
+  } ],
+  "id" : 1,
+  "name" : {
+    "last" : "Doe",
+    "first" : "John"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "1 nifi street",
+    "city" : "nifi city"
+  } ]
+}, {
+  "bankID" : "nifi bank",
+  "IBAN" : "myIBAN",
+  "last5Transactions" : null,
+  "id" : 3,
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "1 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "1 nifi avenue",
+    "city" : "apache city"
+  } ]
+}, {
+  "bankID" : "apache bank",
+  "IBAN" : "myIBAN",
+  "last5Transactions" : [ {
+    "comment" : "gas station",
+    "amount" : "-45"
+  }, {
+    "comment" : "hair cut",
+    "amount" : "-19"
+  } ],
+  "id" : 3,
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "1 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "1 nifi avenue",
+    "city" : "apache city"
+  } ]
+}, {
+  "bankID" : "nifi bank",
+  "IBAN" : "myIBAN",
+  "last5Transactions" : [ {
+    "comment" : "gift",
+    "amount" : "+100"
+  }, {
+    "comment" : "flights",
+    "amount" : "-190"
+  } ],
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "10 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "10 nifi avenue",
+    "city" : "apache city"
+  } ]
+}, {
+  "bankID" : "apache bank",
+  "IBAN" : "myIBAN",
+  "last5Transactions" : [ {
+    "comment" : "nifi tshirt",
+    "amount" : "0"
+  }, {
+    "comment" : "theatre",
+    "amount" : "-19"
+  } ],
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "10 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "10 nifi avenue",
+    "city" : "apache city"
+  } ]
+} ]
\ No newline at end of file


Reply via email to