Repository: nifi
Updated Branches:
  refs/heads/master 3b3d6d4eb -> 397e88c85


NIFI-4227 - add a ForkRecord processor
Added split/extract modes, unit tests, and additional details

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/be0ed704
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/be0ed704
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/be0ed704

Branch: refs/heads/master
Commit: be0ed704231fb51ed233b58575e1378fc2c93e1d
Parents: 3b3d6d4
Author: Pierre Villard <[email protected]>
Authored: Tue Jul 25 17:41:49 2017 +0200
Committer: Mark Payne <[email protected]>
Committed: Thu May 24 15:55:17 2018 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |  10 +
 .../nifi/processors/standard/ForkRecord.java    | 378 +++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      | 481 ++++++++++++++++++
 .../processors/standard/TestForkRecord.java     | 486 +++++++++++++++++++
 .../input/complex-input-json.json               |  99 ++++
 .../output/extract-transactions.json            |  49 ++
 .../TestForkRecord/output/split-address.json    | 125 +++++
 .../output/split-transactions.json              | 181 +++++++
 .../TestForkRecord/schema/extract-schema.avsc   |  21 +
 .../resources/TestForkRecord/schema/schema.avsc |  56 +++
 .../single-element-nested-array-strings.json    |  10 +
 .../single-element-nested-array.json            |  16 +
 .../single-element-nested-nested-array.json     |  32 ++
 .../two-elements-nested-nested-array-null.json  |  50 ++
 15 files changed, 1995 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 6052daf..6f1a77d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -483,6 +483,16 @@
                         
<exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude>
                         <exclude>src/test/resources/xxe_template.xml</exclude>
                         
<exclude>src/test/resources/xxe_from_report.xml</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/single-element-nested-array-strings.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/single-element-nested-array.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/single-element-nested-nested-array.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/input/complex-input-json.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/output/extract-transactions.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/output/split-address.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/output/split-transactions.json</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/schema/extract-schema.avsc</exclude>
+                        
<exclude>src/test/resources/TestForkRecord/schema/schema.avsc</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
new file mode 100644
index 0000000..2c28603
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"fork", "record", "content", "array", "stream", "event"})
+@CapabilityDescription("This processor allows the user to fork a record into 
multiple records. The user must specify at least one "
+        + "Record Path, as a dynamic property, pointing to a field of type 
ARRAY containing RECORD objects. The processor accepts "
+        + "two modes: 'split' and 'extract'. In both modes, there is one 
record generated per element contained in the designated "
+        + "array. In the 'split' mode, each generated record will preserve the 
same schema as given in the input but the array "
+        + "will contain only one element. In the 'extract' mode, the element 
of the array must be of record type and will be the "
+        + "generated record. Additionally, in the 'extract' mode, it is 
possible to specify if each generated record should contain "
+        + "all the fields of the parent records from the root level to the 
extracted record. This assumes that the fields to add in "
+        + "the record are defined in the schema of the Record Writer 
controller service. See examples in the additional details "
+        + "documentation of this processor.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "record.count", description = "The generated 
FlowFile will have a 'record.count' attribute indicating "
+            + "the number of records that were written to the FlowFile."),
+    @WritesAttribute(attribute = "mime.type", description = "The MIME Type 
indicated by the Record Writer"),
+    @WritesAttribute(attribute = "<Attributes from Record Writer>", 
description = "Any Attribute that the configured Record Writer "
+            + "returns will be added to the FlowFile.")
+})
+public class ForkRecord extends AbstractProcessor {
+
+    private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
+
+    static final AllowableValue MODE_EXTRACT = new AllowableValue("extract", 
"Extract",
+            "Generated records will preserve the input schema and will contain 
a one-element array");
+    static final AllowableValue MODE_SPLIT = new AllowableValue("split", 
"Split",
+            "Generated records will be the elements of the array");
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for reading 
incoming data")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing 
out the records")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("fork-mode")
+            .displayName("Mode")
+            .description("Specifies the forking mode of the processor")
+            .allowableValues(MODE_EXTRACT, MODE_SPLIT)
+            .defaultValue(MODE_SPLIT.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_PARENT_FIELDS = new 
PropertyDescriptor.Builder()
+            .name("include-parent-fields")
+            .displayName("Include Parent Fields")
+            .description("This parameter is only valid with the 'extract' 
mode. If set to true, all the fields "
+                    + "from the root level to the given array will be added as 
fields of each element of the "
+                    + "array to fork.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_FORK = new Relationship.Builder()
+            .name("fork")
+            .description("The FlowFiles containing the forked records will be 
routed to this relationship")
+            .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original FlowFiles will be routed to this 
relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("In case a FlowFile generates an error during the 
fork operation, it will be routed to this relationship")
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        properties.add(MODE);
+        properties.add(INCLUDE_PARENT_FIELDS);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_FORK);
+        return relationships;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+        Validator validator = new RecordPathValidator();
+
+        Map<PropertyDescriptor, String> processorProperties = 
validationContext.getProperties();
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
processorProperties.entrySet()) {
+            PropertyDescriptor property = entry.getKey();
+            if (property.isDynamic() && 
property.isExpressionLanguageSupported()) {
+                String dynamicValue = 
validationContext.getProperty(property).getValue();
+                
if(!validationContext.isExpressionLanguagePresent(dynamicValue)) {
+                    results.add(validator.validate(property.getDisplayName(), 
dynamicValue, validationContext));
+                }
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final List<RecordPath> recordPaths = new ArrayList<RecordPath>();
+        Map<PropertyDescriptor, String> processorProperties = 
context.getProperties();
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
processorProperties.entrySet()) {
+            PropertyDescriptor property = entry.getKey();
+            if (property.isDynamic() && 
property.isExpressionLanguageSupported()) {
+                String path = 
context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
+                if(StringUtils.isNotBlank(path)) {
+                    
recordPaths.add(recordPathCache.getCompiled(context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue()));
+                }
+            }
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean addParentFields = 
context.getProperty(INCLUDE_PARENT_FIELDS).asBoolean();
+        final boolean isSplitMode = 
context.getProperty(MODE).getValue().equals(MODE_SPLIT.getValue());
+
+        final FlowFile original = flowFile;
+        final Map<String, String> originalAttributes = 
original.getAttributes();
+
+        final FlowFile outFlowFile = session.create(original);
+        final AtomicInteger readCount = new AtomicInteger(0);
+        final AtomicInteger writeCount = new AtomicInteger(0);
+
+        try {
+
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
+
+                        final RecordSchema writeSchema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final OutputStream out = session.write(outFlowFile);
+
+                        try (final RecordSetWriter recordSetWriter = 
writerFactory.createWriter(getLogger(), writeSchema, out)) {
+
+                            recordSetWriter.beginRecordSet();
+
+                            // we read each record of the input flow file
+                            Record record;
+                            while ((record = reader.nextRecord()) != null) {
+
+                                readCount.incrementAndGet();
+
+                                for(RecordPath recordPath : recordPaths) {
+
+                                    // evaluate record path in each record of 
the flow file
+                                    Iterator<FieldValue> it = 
recordPath.evaluate(record).getSelectedFields().iterator();
+
+                                    while(it.hasNext()) {
+                                        FieldValue fieldValue = it.next();
+                                        RecordFieldType fieldType = 
fieldValue.getField().getDataType().getFieldType();
+
+                                        // we want to have an array here, 
nothing else allowed
+                                        if(fieldType != RecordFieldType.ARRAY) 
{
+                                            getLogger().debug("The record path 
" + recordPath.getPath() + " is matching a field "
+                                                    + "of type " + fieldType + 
" when the type ARRAY is expected.");
+                                            continue;
+                                        }
+
+                                        if(isSplitMode) {
+
+                                            Object[] items = (Object[]) 
fieldValue.getValue();
+                                            for (Object item : items) {
+                                                fieldValue.updateValue(new 
Object[]{item});
+                                                recordSetWriter.write(record);
+                                            }
+
+                                        } else {
+
+                                            // we get the type of the elements 
of the array
+                                            final ArrayDataType arrayDataType 
= (ArrayDataType) fieldValue.getField().getDataType();
+                                            final DataType elementType = 
arrayDataType.getElementType();
+
+                                            // we want to have records in the 
array
+                                            if(elementType.getFieldType() != 
RecordFieldType.RECORD) {
+                                                getLogger().debug("The record 
path " + recordPath.getPath() + " is matching an array field with "
+                                                        + "values of type " + 
elementType.getFieldType() + " when the type RECORD is expected.");
+                                                continue;
+                                            }
+
+                                            Object[] records = (Object[]) 
fieldValue.getValue();
+                                            for (Object elementRecord : 
records) {
+
+                                                if(elementRecord == null) {
+                                                    continue;
+                                                }
+
+                                                Record recordToWrite = 
(Record) elementRecord;
+
+                                                if(addParentFields) {
+                                                    // in this case we want to 
recursively add the parent fields into the record to write
+                                                    // but we need to ensure 
that the Record has the appropriate schema for that
+                                                    
recordToWrite.incorporateSchema(writeSchema);
+                                                    
recursivelyAddParentFields(recordToWrite, fieldValue);
+                                                }
+
+                                                
recordSetWriter.write(recordToWrite);
+                                            }
+
+                                        }
+
+                                    }
+
+                                }
+                            }
+
+                            final WriteResult writeResult = 
recordSetWriter.finishRecordSet();
+
+                            try {
+                                recordSetWriter.close();
+                            } catch (final IOException ioe) {
+                                getLogger().warn("Failed to close Writer for 
{}", new Object[] {outFlowFile});
+                            }
+
+                            final Map<String, String> attributes = new 
HashMap<>();
+                            writeCount.set(writeResult.getRecordCount());
+                            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                            attributes.put(CoreAttributes.MIME_TYPE.key(), 
recordSetWriter.getMimeType());
+                            attributes.putAll(writeResult.getAttributes());
+                            
session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK);
+                        }
+
+                    } catch (final SchemaNotFoundException | 
MalformedRecordException e) {
+                        throw new ProcessException("Could not parse incoming 
data: " + e.getLocalizedMessage(), e);
+                    }
+                }
+
+                private void recursivelyAddParentFields(Record recordToWrite, 
FieldValue fieldValue) {
+                    try {
+                        // we get the parent data
+                        FieldValue parentField = fieldValue.getParent().get();
+                        Record parentRecord = 
fieldValue.getParentRecord().get();
+
+                        // for each field of the parent
+                        for (String field : 
parentRecord.getSchema().getFieldNames()) {
+                            // if and only if there is not an already existing 
field with this name
+                            // (we want to give priority to the deeper 
existing fields)
+                            if(recordToWrite.getValue(field) == null) {
+                                // Updates the value of the field with the 
given name to the given value.
+                                // If the field specified is not present in 
the schema, will do nothing.
+                                recordToWrite.setValue(field, 
parentRecord.getValue(field));
+                            }
+                        }
+
+                        // recursive call
+                        recursivelyAddParentFields(recordToWrite, parentField);
+                    } catch (NoSuchElementException e) {
+                        return;
+                    }
+                }
+            });
+
+        } catch (Exception e) {
+            getLogger().error("Failed to fork {}", new Object[] {flowFile, e});
+            session.remove(outFlowFile);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        session.adjustCounter("Records Processed", readCount.get(), false);
+        session.adjustCounter("Records Generated", writeCount.get(), false);
+        getLogger().debug("Successfully forked {} records into {} records in 
{}", new Object[] {readCount.get(), writeCount.get(), flowFile});
+        session.transfer(original, REL_ORIGINAL);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a93b37f..a15ccf5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -35,6 +35,7 @@ org.apache.nifi.processors.standard.ExtractText
 org.apache.nifi.processors.standard.FetchSFTP
 org.apache.nifi.processors.standard.FetchFile
 org.apache.nifi.processors.standard.FlattenJson
+org.apache.nifi.processors.standard.ForkRecord
 org.apache.nifi.processors.standard.GenerateFlowFile
 org.apache.nifi.processors.standard.GetFile
 org.apache.nifi.processors.standard.GetFTP

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html
new file mode 100644
index 0000000..5cf4c49
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html
@@ -0,0 +1,481 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>ForkRecord</title>
+
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+       <p>
+               ForkRecord allows the user to fork a record into multiple 
records. To do that, the user must specify
+               one or multiple <a 
href="../../../../../html/record-path-guide.html">RecordPath</a> (as dynamic 
+               properties of the processor) pointing to a field of type ARRAY 
containing RECORD elements.
+       </p>
+       <p>
+               The processor accepts two modes:
+               <ul>
+                       <li>Split mode - in this mode, the generated records 
will have the same schema as the input. For 
+                       every element in the array, one record will be 
generated and the array will only contain this 
+                       element.</li>
+                       <li>Extract mode - in this mode, the generated records 
will be the elements contained in the array. 
+                       Besides, it is also possible to add in each record all 
the fields of the parent records from the root 
+                       level to the record element being forked. However it 
supposes the fields to add are defined in the 
+                       schema of the Record Writer controller service. </li>
+               </ul>
+       </p>
+       
+       <h2>Examples</h2>
+       
+       <h3>EXTRACT mode</h3>
+       
+       <p>
+               To better understand how this Processor works, we will lay out 
a few examples. For the sake of these examples, let's assume that our input
+               data is JSON formatted and looks like this:
+       </p>
+
+<code>
+<pre>
+[{
+       "id": 1,
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "accounts": [{
+               "id": 42,
+               "balance": 4750.89
+       }, {
+               "id": 43,
+               "balance": 48212.38
+       }]
+}, 
+{
+       "id": 2,
+       "name": "Jane Doe",
+       "address": "345 My Street",
+       "city": "Her City", 
+       "state": "NY",
+       "zipCode": "22222",
+       "country": "USA",
+       "accounts": [{
+               "id": 45,
+               "balance": 6578.45
+       }, {
+               "id": 46,
+               "balance": 34567.21
+       }]
+}]
+</pre>
+</code>
+
+
+       <h4>Example 1 - Extracting without parent fields</h4>
+       
+       <p>
+               For this case, we want to create one record per 
<code>account</code> and we don't care about 
+               the other fields. We'll add a dynamic property "path" set to 
<code>/accounts</code>. The resulting 
+               flow file will contain 4 records and will look like (assuming 
the Record Writer schema is correctly set):
+       </p>
+
+<code>
+<pre>
+[{
+       "id": 42,
+       "balance": 4750.89
+}, {
+       "id": 43,
+       "balance": 48212.38
+}, {
+       "id": 45,
+       "balance": 6578.45
+}, {
+       "id": 46,
+       "balance": 34567.21
+}]
+</pre>
+</code>
+
+       
+       <h4>Example 2 - Extracting with parent fields</h4>
+       
+       <p>
+               Now, if we set the property "Include parent fields" to true, 
this will recursively include 
+               the parent fields into the output records assuming the Record 
Writer schema allows it. In 
+               case multiple fields have the same name (like we have in this 
example for <code>id</code>), 
+               the child field will have the priority over all the parent 
fields sharing the same name. In 
+               this case, the <code>id</code> of the array 
<code>accounts</code> will be saved in the 
+               forked records. The resulting flow file will contain 4 records 
and will look like:
+       </p>
+
+<code>
+<pre>
+[{
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "id": 42,
+       "balance": 4750.89
+}, {
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "id": 43,
+       "balance": 48212.38
+}, {
+       "name": "Jane Doe",
+       "address": "345 My Street",
+       "city": "Her City", 
+       "state": "NY",
+       "zipCode": "22222",
+       "country": "USA",
+       "id": 45,
+       "balance": 6578.45
+}, {
+       "name": "Jane Doe",
+       "address": "345 My Street",
+       "city": "Her City", 
+       "state": "NY",
+       "zipCode": "22222",
+       "country": "USA",
+       "id": 46,
+       "balance": 34567.21
+}]
+</pre>
+</code>
+       
+       
+       <h4>Example 3 - Multi-nested arrays</h4>
+       
+       <p>
+               Now let's say that the input record contains multi-nested 
arrays like the below example:
+       </p>
+
+<code>
+<pre>
+[{
+       "id": 1,
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "accounts": [{
+               "id": 42,
+               "balance": 4750.89,
+               "transactions": [{
+                       "id": 5,
+                       "amount": 150.31
+               },
+               {
+                       "id": 6,
+                       "amount": -15.31
+               }]
+       }, {
+               "id": 43,
+               "balance": 48212.38,
+               "transactions": [{
+                       "id": 7,
+                       "amount": 36.78
+               },
+               {
+                       "id": 8,
+                       "amount": -21.34
+               }]
+       }]
+}]
+</pre>
+</code>
+       
+       <p>
+               If we want to have one record per <code>transaction</code> for 
each <code>account</code>, then 
+               the Record Path should be set to 
<code>/accounts[*]/transactions</code>. If we have the following 
+               schema for our Record Reader:
+       </p>
+
+<code>
+<pre>
+{
+    "type" : "record",
+    "name" : "bank",
+    "fields" : [ {
+      "name" : "id",
+      "type" : "int"
+    }, {
+      "name" : "name",
+      "type" : "string"
+    }, {
+      "name" : "address",
+      "type" : "string"
+    }, {
+      "name" : "city",
+      "type" : "string"
+    }, {
+      "name" : "state",
+      "type" : "string"
+    }, {
+      "name" : "zipCode",
+      "type" : "string"
+    }, {
+      "name" : "country",
+      "type" : "string"
+    }, {
+      "name" : "accounts",
+      "type" : {
+        "type" : "array",
+        "items" : {
+          "type" : "record",
+          "name" : "accounts",
+          "fields" : [ {
+            "name" : "id",
+            "type" : "int"
+          }, {
+            "name" : "balance",
+            "type" : "double"
+          }, {
+            "name" : "transactions",
+            "type" : {
+              "type" : "array",
+              "items" : {
+                "type" : "record",
+                "name" : "transactions",
+                "fields" : [ {
+                  "name" : "id",
+                  "type" : "int"
+                }, {
+                  "name" : "amount",
+                  "type" : "double"
+                } ]
+              }
+            }
+          } ]
+        }
+      }
+    } ]
+}
+</pre>
+</code>
+       
+       <p>
+               And if we have the following schema for our Record Writer:
+       </p>
+
+<code>
+<pre>
+{
+    "type" : "record",
+    "name" : "bank",
+    "fields" : [ {
+      "name" : "id",
+      "type" : "int"
+    }, {
+      "name" : "name",
+      "type" : "string"
+    }, {
+      "name" : "address",
+      "type" : "string"
+    }, {
+      "name" : "city",
+      "type" : "string"
+    }, {
+      "name" : "state",
+      "type" : "string"
+    }, {
+      "name" : "zipCode",
+      "type" : "string"
+    }, {
+      "name" : "country",
+      "type" : "string"
+    }, {
+      "name" : "amount",
+      "type" : "double"
+    }, {
+      "name" : "balance",
+      "type" : "double"
+    } ]
+}
+</pre>
+</code>
+       
+       <p>
+               Then, if we include the parent fields, we'll have 4 records as 
below:
+       </p>
+
+<code>
+<pre>
+[ {
+  "id" : 5,
+  "name" : "John Doe",
+  "address" : "123 My Street",
+  "city" : "My City",
+  "state" : "MS",
+  "zipCode" : "11111",
+  "country" : "USA",
+  "amount" : 150.31,
+  "balance" : 4750.89
+}, {
+  "id" : 6,
+  "name" : "John Doe",
+  "address" : "123 My Street",
+  "city" : "My City",
+  "state" : "MS",
+  "zipCode" : "11111",
+  "country" : "USA",
+  "amount" : -15.31,
+  "balance" : 4750.89
+}, {
+  "id" : 7,
+  "name" : "John Doe",
+  "address" : "123 My Street",
+  "city" : "My City",
+  "state" : "MS",
+  "zipCode" : "11111",
+  "country" : "USA",
+  "amount" : 36.78,
+  "balance" : 48212.38
+}, {
+  "id" : 8,
+  "name" : "John Doe",
+  "address" : "123 My Street",
+  "city" : "My City",
+  "state" : "MS",
+  "zipCode" : "11111",
+  "country" : "USA",
+  "amount" : -21.34,
+  "balance" : 48212.38
+} ]
+</pre>
+</code>
+       
+       <h3>SPLIT mode</h3>
+       
+               <h4>Example</h4>
+       
+       <p>
+               Assuming we have the below data and we added a property "path" 
set to <code>/accounts</code>:
+       </p>
+
+<code>
+<pre>
+[{
+       "id": 1,
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "accounts": [{
+               "id": 42,
+               "balance": 4750.89
+       }, {
+               "id": 43,
+               "balance": 48212.38
+       }]
+}, 
+{
+       "id": 2,
+       "name": "Jane Doe",
+       "address": "345 My Street",
+       "city": "Her City", 
+       "state": "NY",
+       "zipCode": "22222",
+       "country": "USA",
+       "accounts": [{
+               "id": 45,
+               "balance": 6578.45
+       }, {
+               "id": 46,
+               "balance": 34567.21
+       }]
+}]
+</pre>
+</code>
+
+       <p>
+               Then we'll get 4 records as below:
+       </p>
+
+<code>
+<pre>
+[{
+       "id": 1,
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "accounts": [{
+               "id": 42,
+               "balance": 4750.89
+       }]
+},
+{
+       "id": 1,
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "accounts": [{
+               "id": 43,
+               "balance": 48212.38
+       }]
+},
+{
+       "id": 2,
+       "name": "Jane Doe",
+       "address": "345 My Street",
+       "city": "Her City", 
+       "state": "NY",
+       "zipCode": "22222",
+       "country": "USA",
+       "accounts": [{
+               "id": 45,
+               "balance": 6578.45
+       }]
+}, 
+{
+       "id": 2,
+       "name": "Jane Doe",
+       "address": "345 My Street",
+       "city": "Her City", 
+       "state": "NY",
+       "zipCode": "22222",
+       "country": "USA",
+       "accounts": [{
+               "id": 46,
+               "balance": 34567.21
+       }]
+}]
+</pre>
+</code>
+
+       </body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
new file mode 100644
index 0000000..562b8f0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestForkRecord {
+
+    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+    private final String timestampFormat = 
RecordFieldType.TIMESTAMP.getDefaultFormat();
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("address", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("city", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("state", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("zipCode", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("country", 
RecordFieldType.STRING.getDataType()));
+        return fields;
+    }
+
+    private RecordSchema getAccountSchema() {
+        final List<RecordField> accountFields = new ArrayList<>();
+        accountFields.add(new RecordField("id", 
RecordFieldType.INT.getDataType()));
+        accountFields.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        return new SimpleRecordSchema(accountFields);
+    }
+
+    private RecordSchema getAccountWithTransactionSchema() {
+        final List<RecordField> accountFields = new ArrayList<>();
+        accountFields.add(new RecordField("id", 
RecordFieldType.INT.getDataType()));
+        accountFields.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+
+        final DataType transactionRecordType = 
RecordFieldType.RECORD.getRecordDataType(getTransactionSchema());
+        final DataType transactionsType = 
RecordFieldType.ARRAY.getArrayDataType(transactionRecordType);
+        accountFields.add(new RecordField("transactions", transactionsType));
+
+        return new SimpleRecordSchema(accountFields);
+    }
+
+    private RecordSchema getTransactionSchema() {
+        final List<RecordField> transactionFields = new ArrayList<>();
+        transactionFields.add(new RecordField("id", 
RecordFieldType.INT.getDataType()));
+        transactionFields.add(new RecordField("amount", 
RecordFieldType.DOUBLE.getDataType()));
+        return new SimpleRecordSchema(transactionFields);
+    }
+
+    @Test
+    public void testForkExtractSimpleWithoutParentFields() throws IOException, 
MalformedRecordException, InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final DataType accountRecordType = 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final JsonRecordReader readerService = new JsonRecordReader(schema);
+        final MockRecordWriter writerService = new 
CustomRecordWriter("header", false, getAccountSchema());
+
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty("my-path", "/accounts");
+
+        runner.enqueue(new 
File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath());
+        runner.run(1);
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
+        mff.assertAttributeEquals("record.count", "2");
+        mff.assertContentEquals("header\n42,4750.89\n43,48212.38\n");
+    }
+
+    @Test
+    public void testForkExtractSimpleWithParentFields() throws IOException, 
MalformedRecordException, InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final DataType accountRecordType = 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final List<RecordField> fieldsWrite = getDefaultFields();
+        fieldsWrite.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
+
+        final JsonRecordReader readerService = new JsonRecordReader(schema);
+        final MockRecordWriter writerService = new 
CustomRecordWriter("header", false, schemaWrite);
+
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+        runner.setProperty("my-path", "/accounts");
+
+        runner.enqueue(new 
File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath());
+        runner.run(1);
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
+        mff.assertAttributeEquals("record.count", "2");
+        mff.assertContentEquals("header\n42,4750.89,John Doe,123 My Street,My 
City,MS,11111,USA\n43,48212.38,John Doe,123 My Street,My City,MS,11111,USA\n");
+    }
+
+    @Test
+    public void testForkExtractNotAnArray() throws IOException, 
MalformedRecordException, InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final DataType accountRecordType = 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final List<RecordField> fieldsWrite = getDefaultFields();
+        fieldsWrite.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
+
+        final JsonRecordReader readerService = new JsonRecordReader(schema);
+        final MockRecordWriter writerService = new 
CustomRecordWriter("header", false, schemaWrite);
+
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+        runner.setProperty("my-path", "/country");
+
+        runner.enqueue(new 
File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath());
+        runner.run(1);
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
+        mff.assertAttributeEquals("record.count", "0");
+    }
+
+    @Test
+    public void testForkExtractNotAnArrayOfRecords() throws IOException, 
MalformedRecordException, InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final DataType accountRecordType = 
RecordFieldType.STRING.getDataType();
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final List<RecordField> fieldsWrite = getDefaultFields();
+        fieldsWrite.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
+
+        final JsonRecordReader readerService = new JsonRecordReader(schema);
+        final MockRecordWriter writerService = new 
CustomRecordWriter("header", false, schemaWrite);
+
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+        runner.setProperty("my-path", "/accounts");
+
+        runner.enqueue(new 
File("src/test/resources/TestForkRecord/single-element-nested-array-strings.json").toPath());
+        runner.run(1);
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
+        mff.assertAttributeEquals("record.count", "0");
+    }
+
+    @Test
+    public void testForkExtractComplexWithParentFields() throws IOException, 
MalformedRecordException, InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final DataType accountRecordType = 
RecordFieldType.RECORD.getRecordDataType(getAccountWithTransactionSchema());
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final List<RecordField> fieldsWrite = getDefaultFields();
+        fieldsWrite.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        fieldsWrite.add(new RecordField("amount", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
+
+        final JsonRecordReader readerService = new JsonRecordReader(schema);
+        final MockRecordWriter writerService = new 
CustomRecordWriter("header", false, schemaWrite);
+
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+        runner.setProperty("my-path", "/accounts[*]/transactions");
+
+        runner.enqueue(new 
File("src/test/resources/TestForkRecord/single-element-nested-nested-array.json").toPath());
+        runner.run(1);
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
+        mff.assertAttributeEquals("record.count", "4");
+        mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My 
City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My 
City,MS,11111,USA,4750.89\n"
+                + "7,36.78,John Doe,123 My Street,My 
City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My 
City,MS,11111,USA,48212.38\n");
+    }
+
+    @Test
+    public void testForkExtractComplexWithoutParentFields() throws 
IOException, MalformedRecordException, InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final DataType accountRecordType = 
RecordFieldType.RECORD.getRecordDataType(getAccountWithTransactionSchema());
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final List<RecordField> fieldsWrite = new ArrayList<RecordField>();
+        fieldsWrite.add(new RecordField("id", 
RecordFieldType.INT.getDataType()));
+        fieldsWrite.add(new RecordField("amount", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
+
+        final JsonRecordReader readerService = new JsonRecordReader(schema);
+        final MockRecordWriter writerService = new 
CustomRecordWriter("header", false, schemaWrite);
+
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "false");
+        runner.setProperty("my-path", "/accounts[*]/transactions");
+
+        runner.enqueue(new 
File("src/test/resources/TestForkRecord/single-element-nested-nested-array.json").toPath());
+        runner.run(1);
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
+        mff.assertAttributeEquals("record.count", "4");
+        
mff.assertContentEquals("header\n5,150.31\n6,-15.31\n7,36.78\n8,-21.34\n");
+    }
+
+    @Test
+    public void testForkExtractComplexWithParentFieldsAndNull() throws 
IOException, MalformedRecordException, InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+        final DataType accountRecordType = 
RecordFieldType.RECORD.getRecordDataType(getAccountWithTransactionSchema());
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final List<RecordField> fieldsWrite = getDefaultFields();
+        fieldsWrite.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+        fieldsWrite.add(new RecordField("amount", 
RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
+
+        final JsonRecordReader readerService = new JsonRecordReader(schema);
+        final MockRecordWriter writerService = new 
CustomRecordWriter("header", false, schemaWrite);
+
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ForkRecord.RECORD_READER, "reader");
+        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+        runner.setProperty("my-path", "/accounts[*]/transactions");
+
+        runner.enqueue(new 
File("src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json").toPath());
+        runner.run(1);
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
+        mff.assertAttributeEquals("record.count", "4");
+        mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My 
City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My 
City,MS,11111,USA,4750.89\n"
+                + "7,36.78,John Doe,123 My Street,My 
City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My 
City,MS,11111,USA,48212.38\n");
+    }
+
+    @Test
+    public void testSplitMode() throws InitializationException, IOException {
+        String expectedOutput = null;
+        final TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("record-reader", jsonReader);
+
+        final String inputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc")));
+        final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc")));
+
+        runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, 
inputSchemaText);
+        runner.enableControllerService(jsonReader);
+        runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("record-writer", jsonWriter);
+        runner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, 
outputSchemaText);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.setProperty(jsonWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(jsonWriter);
+        runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_SPLIT);
+
+        runner.setProperty("my-path", "/address");
+        
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json"));
+        runner.run();
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+        expectedOutput = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/split-address.json")));
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
 "5");
+
+        runner.clearTransferState();
+        runner.setProperty("my-path", "/bankAccounts[*]/last5Transactions");
+        
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json"));
+        runner.run();
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+        expectedOutput = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/split-transactions.json")));
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
 "6");
+    }
+
+    @Test
+    public void testExtractMode() throws InitializationException, IOException {
+        String expectedOutput = null;
+        final TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("record-reader", jsonReader);
+
+        final String inputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc")));
+        final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/extract-schema.avsc")));
+
+        runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, 
inputSchemaText);
+        runner.enableControllerService(jsonReader);
+        runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("record-writer", jsonWriter);
+        runner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, 
outputSchemaText);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.setProperty(jsonWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(jsonWriter);
+        runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+
+        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+
+        runner.setProperty("my-path", "/bankAccounts[*]/last5Transactions");
+        
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json"));
+        runner.run();
+        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+        expectedOutput = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-transactions.json")));
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+        
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count",
 "6");
+    }
+
+    private class JsonRecordReader extends AbstractControllerService 
implements RecordReaderFactory {
+
+        RecordSchema schema;
+
+        public JsonRecordReader(RecordSchema schema) {
+            this.schema = schema;
+        }
+
+        @Override
+        public RecordReader createRecordReader(FlowFile flowFile, InputStream 
in, ComponentLog logger) throws MalformedRecordException, IOException, 
SchemaNotFoundException {
+            return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, 
timeFormat, timestampFormat);
+        }
+
+        @Override
+        public RecordReader createRecordReader(Map<String, String> variables, 
InputStream in, ComponentLog logger) throws MalformedRecordException, 
IOException, SchemaNotFoundException {
+            return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, 
timeFormat, timestampFormat);
+        }
+
+    }
+
+    private class CustomRecordWriter extends MockRecordWriter {
+
+        RecordSchema schema;
+
+        public CustomRecordWriter(final String header, final boolean 
quoteValues, RecordSchema schema) {
+            super(header, quoteValues);
+            this.schema = schema;
+        }
+
+        @Override
+        public RecordSchema getSchema(Map<String, String> variables, 
RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+            return this.schema;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json.json
new file mode 100644
index 0000000..303198a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json.json
@@ -0,0 +1,99 @@
+[
+       {
+               "id": 1,
+               "name": {
+                       "last": "Doe",
+                       "first": "John"
+               },
+               "address": [],
+               "bankAccounts": null
+       }, {
+               "id": 2,
+               "name": {
+                       "last": "Smith",
+                       "first": "John"
+               },
+               "address": [null],
+               "bankAccounts": null
+       }, {
+               "id": 3,
+               "name": {
+                       "last": "Smith",
+                       "first": "Jane"
+               },
+               "address": [
+                       {
+                               "id": "home",
+                               "street": "1 nifi street",
+                               "city": "nifi city"
+                       }, {
+                               "id": "work",
+                               "street": "1 nifi avenue",
+                               "city": "apache city"
+                       }
+               ],
+               "bankAccounts": [
+                       {
+                               "bankID": "nifi bank",
+                               "IBAN": "myIBAN",
+                               "last5Transactions": null
+                       }, {
+                               "bankID": "apache bank",
+                               "IBAN": "myIBAN",
+                               "last5Transactions": [
+                                       {
+                                               "comment": "gas station",
+                                               "amount": "-45"
+                                       }, {
+                                               "comment": "hair cut",
+                                               "amount": "-19"
+                                       }
+                               ]
+                       }
+               ]
+       }, {
+               "id": 4,
+               "name": {
+                       "last": "Clark",
+                       "first": "Jane"
+               },
+               "address": [
+                       {
+                               "id": "home",
+                               "street": "10 nifi street",
+                               "city": "nifi city"
+                       }, {
+                               "id": "work",
+                               "street": "10 nifi avenue",
+                               "city": "apache city"
+                       }
+               ],
+               "bankAccounts": [
+                       {
+                               "bankID": "nifi bank",
+                               "IBAN": "myIBAN",
+                               "last5Transactions": [
+                                       {
+                                               "comment": "gift",
+                                               "amount": "+100"
+                                       }, {
+                                               "comment": "flights",
+                                               "amount": "-190"
+                                       }
+                               ]
+                       }, {
+                               "bankID": "apache bank",
+                               "IBAN": "myIBAN",
+                               "last5Transactions": [
+                                       {
+                                               "comment": "nifi tshirt",
+                                               "amount": "0"
+                                       }, {
+                                               "comment": "theatre",
+                                               "amount": "-19"
+                                       }
+                               ]
+                       }
+               ]
+       }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-transactions.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-transactions.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-transactions.json
new file mode 100644
index 0000000..dcba8da
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-transactions.json
@@ -0,0 +1,49 @@
+[ {
+  "comment" : "gas station",
+  "amount" : -45,
+  "id" : 3,
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  }
+}, {
+  "comment" : "hair cut",
+  "amount" : -19,
+  "id" : 3,
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  }
+}, {
+  "comment" : "gift",
+  "amount" : 100,
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  }
+}, {
+  "comment" : "flights",
+  "amount" : -190,
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  }
+}, {
+  "comment" : "nifi tshirt",
+  "amount" : 0,
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  }
+}, {
+  "comment" : "theatre",
+  "amount" : -19,
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-address.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-address.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-address.json
new file mode 100644
index 0000000..3c35c16
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-address.json
@@ -0,0 +1,125 @@
+[ {
+  "id" : 2,
+  "name" : {
+    "last" : "Smith",
+    "first" : "John"
+  },
+  "address" : [ null ],
+  "bankAccounts" : null
+}, {
+  "id" : 3,
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "1 nifi street",
+    "city" : "nifi city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : null
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gas station",
+      "amount" : -45
+    }, {
+      "comment" : "hair cut",
+      "amount" : -19
+    } ]
+  } ]
+}, {
+  "id" : 3,
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "work",
+    "street" : "1 nifi avenue",
+    "city" : "apache city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : null
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gas station",
+      "amount" : -45
+    }, {
+      "comment" : "hair cut",
+      "amount" : -19
+    } ]
+  } ]
+}, {
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "10 nifi street",
+    "city" : "nifi city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gift",
+      "amount" : 100
+    }, {
+      "comment" : "flights",
+      "amount" : -190
+    } ]
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "nifi tshirt",
+      "amount" : 0
+    }, {
+      "comment" : "theatre",
+      "amount" : -19
+    } ]
+  } ]
+}, {
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "work",
+    "street" : "10 nifi avenue",
+    "city" : "apache city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gift",
+      "amount" : 100
+    }, {
+      "comment" : "flights",
+      "amount" : -190
+    } ]
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "nifi tshirt",
+      "amount" : 0
+    }, {
+      "comment" : "theatre",
+      "amount" : -19
+    } ]
+  } ]
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-transactions.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-transactions.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-transactions.json
new file mode 100644
index 0000000..74cfccb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-transactions.json
@@ -0,0 +1,181 @@
+[ {
+  "id" : 3,
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "1 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "1 nifi avenue",
+    "city" : "apache city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : null
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gas station",
+      "amount" : -45
+    } ]
+  } ]
+}, {
+  "id" : 3,
+  "name" : {
+    "last" : "Smith",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "1 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "1 nifi avenue",
+    "city" : "apache city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : null
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "hair cut",
+      "amount" : -19
+    } ]
+  } ]
+}, {
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "10 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "10 nifi avenue",
+    "city" : "apache city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "gift",
+      "amount" : 100
+    } ]
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "nifi tshirt",
+      "amount" : 0
+    }, {
+      "comment" : "theatre",
+      "amount" : -19
+    } ]
+  } ]
+}, {
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "10 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "10 nifi avenue",
+    "city" : "apache city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "flights",
+      "amount" : -190
+    } ]
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "nifi tshirt",
+      "amount" : 0
+    }, {
+      "comment" : "theatre",
+      "amount" : -19
+    } ]
+  } ]
+}, {
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "10 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "10 nifi avenue",
+    "city" : "apache city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "flights",
+      "amount" : -190
+    } ]
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "nifi tshirt",
+      "amount" : 0
+    } ]
+  } ]
+}, {
+  "id" : 4,
+  "name" : {
+    "last" : "Clark",
+    "first" : "Jane"
+  },
+  "address" : [ {
+    "id" : "home",
+    "street" : "10 nifi street",
+    "city" : "nifi city"
+  }, {
+    "id" : "work",
+    "street" : "10 nifi avenue",
+    "city" : "apache city"
+  } ],
+  "bankAccounts" : [ {
+    "bankID" : "nifi bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "flights",
+      "amount" : -190
+    } ]
+  }, {
+    "bankID" : "apache bank",
+    "IBAN" : "myIBAN",
+    "last5Transactions" : [ {
+      "comment" : "theatre",
+      "amount" : -19
+    } ]
+  } ]
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/extract-schema.avsc
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/extract-schema.avsc
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/extract-schema.avsc
new file mode 100644
index 0000000..39a3c04
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/extract-schema.avsc
@@ -0,0 +1,21 @@
+{
+       "name": "personWithNameRecord",
+       "namespace": "nifi",
+       "type": "record",
+       "fields": [
+               { "name" : "comment", "type": "string" },
+               { "name" : "amount", "type": "long" },
+               { "name": "id", "type": "int" },
+               { "name": "name", "type":
+                       {
+                               "type": "record",
+                               "name": "nameRecord",
+                               "fields": 
+                                       [
+                                               { "name": "last", "type": 
"string" },
+                                               { "name": "first", "type": 
"string" }
+                                       ]
+                       }
+               }
+       ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/schema.avsc
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/schema.avsc
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/schema.avsc
new file mode 100644
index 0000000..5390973
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/schema.avsc
@@ -0,0 +1,56 @@
+{
+       "name": "personWithNameRecord",
+       "namespace": "nifi",
+       "type": "record",
+       "fields": [
+               { "name": "id", "type": "int" },
+               { "name": "name", "type":
+                       {
+                               "type": "record",
+                               "name": "nameRecord",
+                               "fields": 
+                                       [
+                                               { "name": "last", "type": 
"string" },
+                                               { "name": "first", "type": 
"string" }
+                                       ]
+                       }
+               },
+               { "name" : "address", "type": ["null",
+                                                                               
{ "type" : "array", "items" : {
+                                                                               
                "type" : "record",
+                                                                               
                "name" : "addressRecord",
+                                                                               
                "fields" : [
+                                                                               
                        { "name" : "id", "type": "string" },
+                                                                               
                        { "name" : "street", "type": "string" },
+                                                                               
                        { "name" : "city", "type": "string" }
+                                                                               
                        ]
+                                                                               
                }
+                                               }
+                                       ]
+        },
+               { "name" : "bankAccounts", "type": ["null",
+                                                                               
{ "type" : "array", "items" : {
+                                                                               
                "type" : "record",
+                                                                               
                "name" : "bankAccountRecord",
+                                                                               
                "fields" : [
+                                                                               
                        { "name" : "bankID", "type": "string" },
+                                                                               
                        { "name" : "IBAN", "type": "string" },
+                                                                               
                        { "name" : "last5Transactions", "type": ["null",
+                                                                               
                                                                                
                                                        { "type" : "array", 
"items" : {
+                                                                               
                                                                                
                                                                        "type" 
: "record",
+                                                                               
                                                                                
                                                                        "name" 
: "transactionRecord",
+                                                                               
                                                                                
                                                                        
"fields" : [
+                                                                               
                                                                                
                                                                                
{ "name" : "comment", "type": "string" },
+                                                                               
                                                                                
                                                                                
{ "name" : "amount", "type": "long" }
+                                                                               
                                                                                
                                                                                
]
+                                                                               
                                                                                
                                                                        }
+                                                                               
                                                                                
                        }
+                                                                               
                                                                                
                ]
+                                                                               
                        }
+                                                                               
                        ]
+                                                                               
                }
+                                               }
+                                       ]
+        }
+       ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array-strings.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array-strings.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array-strings.json
new file mode 100644
index 0000000..3ecc536
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array-strings.json
@@ -0,0 +1,10 @@
+{
+       "id": 1,
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "accounts": [ "accountA", "accountB" ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array.json
new file mode 100644
index 0000000..0bca3a0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array.json
@@ -0,0 +1,16 @@
+{
+       "id": 1,
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "accounts": [{
+               "id": 42,
+               "balance": 4750.89
+       }, {
+               "id": 43,
+               "balance": 48212.38
+       }]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-nested-array.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-nested-array.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-nested-array.json
new file mode 100644
index 0000000..eaa6e4f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-nested-array.json
@@ -0,0 +1,32 @@
+{
+       "id": 1,
+       "name": "John Doe",
+       "address": "123 My Street",
+       "city": "My City", 
+       "state": "MS",
+       "zipCode": "11111",
+       "country": "USA",
+       "accounts": [{
+               "id": 42,
+               "balance": 4750.89,
+               "transactions": [{
+                       "id": 5,
+                       "amount": 150.31
+               },
+               {
+                       "id": 6,
+                       "amount": -15.31
+               }]
+       }, {
+               "id": 43,
+               "balance": 48212.38,
+               "transactions": [{
+                       "id": 7,
+                       "amount": 36.78
+               },
+               {
+                       "id": 8,
+                       "amount": -21.34
+               }]
+       }]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json
new file mode 100644
index 0000000..89ade88
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json
@@ -0,0 +1,50 @@
+[
+       {
+               "id": 1,
+               "name": "John Doe",
+               "address": "123 My Street",
+               "city": "My City", 
+               "state": "MS",
+               "zipCode": "11111",
+               "country": "USA",
+               "accounts": [{
+                       "id": 42,
+                       "balance": 4750.89,
+                       "transactions": [{
+                               "id": 5,
+                               "amount": 150.31
+                       },
+                       {
+                               "id": 6,
+                               "amount": -15.31
+                       }]
+               }, {
+                       "id": 43,
+                       "balance": 48212.38,
+                       "transactions": [{
+                               "id": 7,
+                               "amount": 36.78
+                       },
+                       {
+                               "id": 8,
+                               "amount": -21.34
+                       }]
+               }]
+       }, {
+               "id": 2,
+               "name": "John Doe",
+               "address": "123 My Street",
+               "city": "My City", 
+               "state": "MS",
+               "zipCode": "11111",
+               "country": "USA",
+               "accounts": [{
+                       "id": 42,
+                       "balance": 4750.89
+               }, {
+                       "id": 43,
+                       "balance": 48212.38,
+                       "transactions": null
+               }]
+       }
+]
\ No newline at end of file

Reply via email to