Repository: nifi Updated Branches: refs/heads/master 3b3d6d4eb -> 397e88c85
NIFI-4227 - add a ForkRecord processor Added split/extract modes, unit tests, and additional details Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/be0ed704 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/be0ed704 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/be0ed704 Branch: refs/heads/master Commit: be0ed704231fb51ed233b58575e1378fc2c93e1d Parents: 3b3d6d4 Author: Pierre Villard <[email protected]> Authored: Tue Jul 25 17:41:49 2017 +0200 Committer: Mark Payne <[email protected]> Committed: Thu May 24 15:55:17 2018 -0400 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 10 + .../nifi/processors/standard/ForkRecord.java | 378 +++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 481 ++++++++++++++++++ .../processors/standard/TestForkRecord.java | 486 +++++++++++++++++++ .../input/complex-input-json.json | 99 ++++ .../output/extract-transactions.json | 49 ++ .../TestForkRecord/output/split-address.json | 125 +++++ .../output/split-transactions.json | 181 +++++++ .../TestForkRecord/schema/extract-schema.avsc | 21 + .../resources/TestForkRecord/schema/schema.avsc | 56 +++ .../single-element-nested-array-strings.json | 10 + .../single-element-nested-array.json | 16 + .../single-element-nested-nested-array.json | 32 ++ .../two-elements-nested-nested-array-null.json | 50 ++ 15 files changed, 1995 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 6052daf..6f1a77d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -483,6 +483,16 @@ <exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude> <exclude>src/test/resources/xxe_template.xml</exclude> <exclude>src/test/resources/xxe_from_report.xml</exclude> + <exclude>src/test/resources/TestForkRecord/single-element-nested-array-strings.json</exclude> + <exclude>src/test/resources/TestForkRecord/single-element-nested-array.json</exclude> + <exclude>src/test/resources/TestForkRecord/single-element-nested-nested-array.json</exclude> + <exclude>src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json</exclude> + <exclude>src/test/resources/TestForkRecord/input/complex-input-json.json</exclude> + <exclude>src/test/resources/TestForkRecord/output/extract-transactions.json</exclude> + <exclude>src/test/resources/TestForkRecord/output/split-address.json</exclude> + <exclude>src/test/resources/TestForkRecord/output/split-transactions.json</exclude> + <exclude>src/test/resources/TestForkRecord/schema/extract-schema.avsc</exclude> + <exclude>src/test/resources/TestForkRecord/schema/schema.avsc</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java new file mode 100644 index 0000000..2c28603 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.record.path.validation.RecordPathValidator; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; + +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"fork", "record", "content", "array", "stream", "event"}) +@CapabilityDescription("This processor allows the user to fork a record into multiple records. The user must specify at least one " + + "Record Path, as a dynamic property, pointing to a field of type ARRAY containing RECORD objects. The processor accepts " + + "two modes: 'split' and 'extract'. In both modes, there is one record generated per element contained in the designated " + + "array. In the 'split' mode, each generated record will preserve the same schema as given in the input but the array " + + "will contain only one element. In the 'extract' mode, the element of the array must be of record type and will be the " + + "generated record. Additionally, in the 'extract' mode, it is possible to specify if each generated record should contain " + + "all the fields of the parent records from the root level to the extracted record. This assumes that the fields to add in " + + "the record are defined in the schema of the Record Writer controller service. See examples in the additional details " + + "documentation of this processor.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "The generated FlowFile will have a 'record.count' attribute indicating " + + "the number of records that were written to the FlowFile."), + @WritesAttribute(attribute = "mime.type", description = "The MIME Type indicated by the Record Writer"), + @WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer " + + "returns will be added to the FlowFile.") +}) +public class ForkRecord extends AbstractProcessor { + + private volatile RecordPathCache recordPathCache = new RecordPathCache(25); + + static final AllowableValue MODE_EXTRACT = new AllowableValue("extract", "Extract", + "Generated records will preserve the input schema and will contain a one-element array"); + static final AllowableValue MODE_SPLIT = new AllowableValue("split", "Split", + "Generated records will be the elements of the array"); + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("fork-mode") + .displayName("Mode") + .description("Specifies the forking mode of the processor") + .allowableValues(MODE_EXTRACT, MODE_SPLIT) + .defaultValue(MODE_SPLIT.getValue()) + .required(true) + .build(); + + public static final PropertyDescriptor INCLUDE_PARENT_FIELDS = new PropertyDescriptor.Builder() + .name("include-parent-fields") + .displayName("Include Parent Fields") + .description("This parameter is only valid with the 'extract' mode. If set to true, all the fields " + + "from the root level to the given array will be added as fields of each element of the " + + "array to fork.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final Relationship REL_FORK = new Relationship.Builder() + .name("fork") + .description("The FlowFiles containing the forked records will be routed to this relationship") + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFiles will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("In case a FlowFile generates an error during the fork operation, it will be routed to this relationship") + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + properties.add(MODE); + properties.add(INCLUDE_PARENT_FIELDS); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + relationships.add(REL_FORK); + return relationships; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext)); + Validator validator = new RecordPathValidator(); + + Map<PropertyDescriptor, String> processorProperties = validationContext.getProperties(); + for (final Map.Entry<PropertyDescriptor, String> entry : processorProperties.entrySet()) { + PropertyDescriptor property = entry.getKey(); + if (property.isDynamic() && property.isExpressionLanguageSupported()) { + String dynamicValue = validationContext.getProperty(property).getValue(); + if(!validationContext.isExpressionLanguagePresent(dynamicValue)) { + results.add(validator.validate(property.getDisplayName(), dynamicValue, validationContext)); + } + } + } + + return results; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final List<RecordPath> recordPaths = new ArrayList<RecordPath>(); + Map<PropertyDescriptor, String> processorProperties = context.getProperties(); + for (final Map.Entry<PropertyDescriptor, String> entry : processorProperties.entrySet()) { + PropertyDescriptor property = entry.getKey(); + if (property.isDynamic() && property.isExpressionLanguageSupported()) { + String path = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue(); + if(StringUtils.isNotBlank(path)) { + recordPaths.add(recordPathCache.getCompiled(context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue())); + } + } + } + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final boolean addParentFields = context.getProperty(INCLUDE_PARENT_FIELDS).asBoolean(); + final boolean isSplitMode = context.getProperty(MODE).getValue().equals(MODE_SPLIT.getValue()); + + final FlowFile original = flowFile; + final Map<String, String> originalAttributes = original.getAttributes(); + + final FlowFile outFlowFile = session.create(original); + final AtomicInteger readCount = new AtomicInteger(0); + final AtomicInteger writeCount = new AtomicInteger(0); + + try { + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { + + final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema()); + final OutputStream out = session.write(outFlowFile); + + try (final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out)) { + + recordSetWriter.beginRecordSet(); + + // we read each record of the input flow file + Record record; + while ((record = reader.nextRecord()) != null) { + + readCount.incrementAndGet(); + + for(RecordPath recordPath : recordPaths) { + + // evaluate record path in each record of the flow file + Iterator<FieldValue> it = recordPath.evaluate(record).getSelectedFields().iterator(); + + while(it.hasNext()) { + FieldValue fieldValue = it.next(); + RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType(); + + // we want to have an array here, nothing else allowed + if(fieldType != RecordFieldType.ARRAY) { + getLogger().debug("The record path " + recordPath.getPath() + " is matching a field " + + "of type " + fieldType + " when the type ARRAY is expected."); + continue; + } + + if(isSplitMode) { + + Object[] items = (Object[]) fieldValue.getValue(); + for (Object item : items) { + fieldValue.updateValue(new Object[]{item}); + recordSetWriter.write(record); + } + + } else { + + // we get the type of the elements of the array + final ArrayDataType arrayDataType = (ArrayDataType) fieldValue.getField().getDataType(); + final DataType elementType = arrayDataType.getElementType(); + + // we want to have records in the array + if(elementType.getFieldType() != RecordFieldType.RECORD) { + getLogger().debug("The record path " + recordPath.getPath() + " is matching an array field with " + + "values of type " + elementType.getFieldType() + " when the type RECORD is expected."); + continue; + } + + Object[] records = (Object[]) fieldValue.getValue(); + for (Object elementRecord : records) { + + if(elementRecord == null) { + continue; + } + + Record recordToWrite = (Record) elementRecord; + + if(addParentFields) { + // in this case we want to recursively add the parent fields into the record to write + // but we need to ensure that the Record has the appropriate schema for that + recordToWrite.incorporateSchema(writeSchema); + recursivelyAddParentFields(recordToWrite, fieldValue); + } + + recordSetWriter.write(recordToWrite); + } + + } + + } + + } + } + + final WriteResult writeResult = recordSetWriter.finishRecordSet(); + + try { + recordSetWriter.close(); + } catch (final IOException ioe) { + getLogger().warn("Failed to close Writer for {}", new Object[] {outFlowFile}); + } + + final Map<String, String> attributes = new HashMap<>(); + writeCount.set(writeResult.getRecordCount()); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK); + } + + } catch (final SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Could not parse incoming data: " + e.getLocalizedMessage(), e); + } + } + + private void recursivelyAddParentFields(Record recordToWrite, FieldValue fieldValue) { + try { + // we get the parent data + FieldValue parentField = fieldValue.getParent().get(); + Record parentRecord = fieldValue.getParentRecord().get(); + + // for each field of the parent + for (String field : parentRecord.getSchema().getFieldNames()) { + // if and only if there is not an already existing field with this name + // (we want to give priority to the deeper existing fields) + if(recordToWrite.getValue(field) == null) { + // Updates the value of the field with the given name to the given value. + // If the field specified is not present in the schema, will do nothing. + recordToWrite.setValue(field, parentRecord.getValue(field)); + } + } + + // recursive call + recursivelyAddParentFields(recordToWrite, parentField); + } catch (NoSuchElementException e) { + return; + } + } + }); + + } catch (Exception e) { + getLogger().error("Failed to fork {}", new Object[] {flowFile, e}); + session.remove(outFlowFile); + session.transfer(original, REL_FAILURE); + return; + } + + session.adjustCounter("Records Processed", readCount.get(), false); + session.adjustCounter("Records Generated", writeCount.get(), false); + getLogger().debug("Successfully forked {} records into {} records in {}", new Object[] {readCount.get(), writeCount.get(), flowFile}); + session.transfer(original, REL_ORIGINAL); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a93b37f..a15ccf5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -35,6 +35,7 @@ org.apache.nifi.processors.standard.ExtractText org.apache.nifi.processors.standard.FetchSFTP org.apache.nifi.processors.standard.FetchFile org.apache.nifi.processors.standard.FlattenJson +org.apache.nifi.processors.standard.ForkRecord org.apache.nifi.processors.standard.GenerateFlowFile org.apache.nifi.processors.standard.GetFile org.apache.nifi.processors.standard.GetFTP http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html new file mode 100644 index 0000000..5cf4c49 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html @@ -0,0 +1,481 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>ForkRecord</title> + + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <p> + ForkRecord allows the user to fork a record into multiple records. To do that, the user must specify + one or multiple <a href="../../../../../html/record-path-guide.html">RecordPath</a> (as dynamic + properties of the processor) pointing to a field of type ARRAY containing RECORD elements. + </p> + <p> + The processor accepts two modes: + <ul> + <li>Split mode - in this mode, the generated records will have the same schema as the input. For + every element in the array, one record will be generated and the array will only contain this + element.</li> + <li>Extract mode - in this mode, the generated records will be the elements contained in the array. + Besides, it is also possible to add in each record all the fields of the parent records from the root + level to the record element being forked. However it supposes the fields to add are defined in the + schema of the Record Writer controller service. </li> + </ul> + </p> + + <h2>Examples</h2> + + <h3>EXTRACT mode</h3> + + <p> + To better understand how this Processor works, we will lay out a few examples. For the sake of these examples, let's assume that our input + data is JSON formatted and looks like this: + </p> + +<code> +<pre> +[{ + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 42, + "balance": 4750.89 + }, { + "id": 43, + "balance": 48212.38 + }] +}, +{ + "id": 2, + "name": "Jane Doe", + "address": "345 My Street", + "city": "Her City", + "state": "NY", + "zipCode": "22222", + "country": "USA", + "accounts": [{ + "id": 45, + "balance": 6578.45 + }, { + "id": 46, + "balance": 34567.21 + }] +}] +</pre> +</code> + + + <h4>Example 1 - Extracting without parent fields</h4> + + <p> + For this case, we want to create one record per <code>account</code> and we don't care about + the other fields. We'll add a dynamic property "path" set to <code>/accounts</code>. The resulting + flow file will contain 4 records and will look like (assuming the Record Writer schema is correctly set): + </p> + +<code> +<pre> +[{ + "id": 42, + "balance": 4750.89 +}, { + "id": 43, + "balance": 48212.38 +}, { + "id": 45, + "balance": 6578.45 +}, { + "id": 46, + "balance": 34567.21 +}] +</pre> +</code> + + + <h4>Example 2 - Extracting with parent fields</h4> + + <p> + Now, if we set the property "Include parent fields" to true, this will recursively include + the parent fields into the output records assuming the Record Writer schema allows it. In + case multiple fields have the same name (like we have in this example for <code>id</code>), + the child field will have the priority over all the parent fields sharing the same name. In + this case, the <code>id</code> of the array <code>accounts</code> will be saved in the + forked records. The resulting flow file will contain 4 records and will look like: + </p> + +<code> +<pre> +[{ + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "id": 42, + "balance": 4750.89 +}, { + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "id": 43, + "balance": 48212.38 +}, { + "name": "Jane Doe", + "address": "345 My Street", + "city": "Her City", + "state": "NY", + "zipCode": "22222", + "country": "USA", + "id": 45, + "balance": 6578.45 +}, { + "name": "Jane Doe", + "address": "345 My Street", + "city": "Her City", + "state": "NY", + "zipCode": "22222", + "country": "USA", + "id": 46, + "balance": 34567.21 +}] +</pre> +</code> + + + <h4>Example 3 - Multi-nested arrays</h4> + + <p> + Now let's say that the input record contains multi-nested arrays like the below example: + </p> + +<code> +<pre> +[{ + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 42, + "balance": 4750.89, + "transactions": [{ + "id": 5, + "amount": 150.31 + }, + { + "id": 6, + "amount": -15.31 + }] + }, { + "id": 43, + "balance": 48212.38, + "transactions": [{ + "id": 7, + "amount": 36.78 + }, + { + "id": 8, + "amount": -21.34 + }] + }] +}] +</pre> +</code> + + <p> + If we want to have one record per <code>transaction</code> for each <code>account</code>, then + the Record Path should be set to <code>/accounts[*]/transactions</code>. If we have the following + schema for our Record Reader: + </p> + +<code> +<pre> +{ + "type" : "record", + "name" : "bank", + "fields" : [ { + "name" : "id", + "type" : "int" + }, { + "name" : "name", + "type" : "string" + }, { + "name" : "address", + "type" : "string" + }, { + "name" : "city", + "type" : "string" + }, { + "name" : "state", + "type" : "string" + }, { + "name" : "zipCode", + "type" : "string" + }, { + "name" : "country", + "type" : "string" + }, { + "name" : "accounts", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "accounts", + "fields" : [ { + "name" : "id", + "type" : "int" + }, { + "name" : "balance", + "type" : "double" + }, { + "name" : "transactions", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "transactions", + "fields" : [ { + "name" : "id", + "type" : "int" + }, { + "name" : "amount", + "type" : "double" + } ] + } + } + } ] + } + } + } ] +} +</pre> +</code> + + <p> + And if we have the following schema for our Record Writer: + </p> + +<code> +<pre> +{ + "type" : "record", + "name" : "bank", + "fields" : [ { + "name" : "id", + "type" : "int" + }, { + "name" : "name", + "type" : "string" + }, { + "name" : "address", + "type" : "string" + }, { + "name" : "city", + "type" : "string" + }, { + "name" : "state", + "type" : "string" + }, { + "name" : "zipCode", + "type" : "string" + }, { + "name" : "country", + "type" : "string" + }, { + "name" : "amount", + "type" : "double" + }, { + "name" : "balance", + "type" : "double" + } ] +} +</pre> +</code> + + <p> + Then, if we include the parent fields, we'll have 4 records as below: + </p> + +<code> +<pre> +[ { + "id" : 5, + "name" : "John Doe", + "address" : "123 My Street", + "city" : "My City", + "state" : "MS", + "zipCode" : "11111", + "country" : "USA", + "amount" : 150.31, + "balance" : 4750.89 +}, { + "id" : 6, + "name" : "John Doe", + "address" : "123 My Street", + "city" : "My City", + "state" : "MS", + "zipCode" : "11111", + "country" : "USA", + "amount" : -15.31, + "balance" : 4750.89 +}, { + "id" : 7, + "name" : "John Doe", + "address" : "123 My Street", + "city" : "My City", + "state" : "MS", + "zipCode" : "11111", + "country" : "USA", + "amount" : 36.78, + "balance" : 48212.38 +}, { + "id" : 8, + "name" : "John Doe", + "address" : "123 My Street", + "city" : "My City", + "state" : "MS", + "zipCode" : "11111", + "country" : "USA", + "amount" : -21.34, + "balance" : 48212.38 +} ] +</pre> +</code> + + <h3>SPLIT mode</h3> + + <h4>Example</h4> + + <p> + Assuming we have the below data and we added a property "path" set to <code>/accounts</code>: + </p> + +<code> +<pre> +[{ + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 42, + "balance": 4750.89 + }, { + "id": 43, + "balance": 48212.38 + }] +}, +{ + "id": 2, + "name": "Jane Doe", + "address": "345 My Street", + "city": "Her City", + "state": "NY", + "zipCode": "22222", + "country": "USA", + "accounts": [{ + "id": 45, + "balance": 6578.45 + }, { + "id": 46, + "balance": 34567.21 + }] +}] +</pre> +</code> + + <p> + Then we'll get 4 records as below: + </p> + +<code> +<pre> +[{ + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 42, + "balance": 4750.89 + }] +}, +{ + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 43, + "balance": 48212.38 + }] +}, +{ + "id": 2, + "name": "Jane Doe", + "address": "345 My Street", + "city": "Her City", + "state": "NY", + "zipCode": "22222", + "country": "USA", + "accounts": [{ + "id": 45, + "balance": 6578.45 + }] +}, +{ + "id": 2, + "name": "Jane Doe", + "address": "345 My Street", + "city": "Her City", + "state": "NY", + "zipCode": "22222", + "country": "USA", + "accounts": [{ + "id": 46, + "balance": 34567.21 + }] +}] +</pre> +</code> + + </body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java new file mode 100644 index 0000000..562b8f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.json.JsonTreeRowRecordReader; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestForkRecord { + + private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); + private final String timeFormat = RecordFieldType.TIME.getDefaultFormat(); + private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); + + private List<RecordField> getDefaultFields() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("address", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("city", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("state", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("country", RecordFieldType.STRING.getDataType())); + return fields; + } + + private RecordSchema getAccountSchema() { + final List<RecordField> accountFields = new ArrayList<>(); + accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + return new SimpleRecordSchema(accountFields); + } + + private RecordSchema getAccountWithTransactionSchema() { + final List<RecordField> accountFields = new ArrayList<>(); + accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + + final DataType transactionRecordType = RecordFieldType.RECORD.getRecordDataType(getTransactionSchema()); + final DataType transactionsType = RecordFieldType.ARRAY.getArrayDataType(transactionRecordType); + accountFields.add(new RecordField("transactions", transactionsType)); + + return new SimpleRecordSchema(accountFields); + } + + private RecordSchema getTransactionSchema() { + final List<RecordField> transactionFields = new ArrayList<>(); + transactionFields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + transactionFields.add(new RecordField("amount", RecordFieldType.DOUBLE.getDataType())); + return new SimpleRecordSchema(transactionFields); + } + + @Test + public void testForkExtractSimpleWithoutParentFields() throws IOException, MalformedRecordException, InitializationException { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final JsonRecordReader readerService = new JsonRecordReader(schema); + final MockRecordWriter writerService = new CustomRecordWriter("header", false, getAccountSchema()); + + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ForkRecord.RECORD_READER, "reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty("my-path", "/accounts"); + + runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath()); + runner.run(1); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0); + mff.assertAttributeEquals("record.count", "2"); + mff.assertContentEquals("header\n42,4750.89\n43,48212.38\n"); + } + + @Test + public void testForkExtractSimpleWithParentFields() throws IOException, MalformedRecordException, InitializationException { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final List<RecordField> fieldsWrite = getDefaultFields(); + fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite); + + final JsonRecordReader readerService = new JsonRecordReader(schema); + final MockRecordWriter writerService = new CustomRecordWriter("header", false, schemaWrite); + + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ForkRecord.RECORD_READER, "reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true"); + runner.setProperty("my-path", "/accounts"); + + runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath()); + runner.run(1); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0); + mff.assertAttributeEquals("record.count", "2"); + mff.assertContentEquals("header\n42,4750.89,John Doe,123 My Street,My City,MS,11111,USA\n43,48212.38,John Doe,123 My Street,My City,MS,11111,USA\n"); + } + + @Test + public void testForkExtractNotAnArray() throws IOException, MalformedRecordException, InitializationException { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final List<RecordField> fieldsWrite = getDefaultFields(); + fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite); + + final JsonRecordReader readerService = new JsonRecordReader(schema); + final MockRecordWriter writerService = new CustomRecordWriter("header", false, schemaWrite); + + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ForkRecord.RECORD_READER, "reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true"); + runner.setProperty("my-path", "/country"); + + runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath()); + runner.run(1); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0); + mff.assertAttributeEquals("record.count", "0"); + } + + @Test + public void testForkExtractNotAnArrayOfRecords() throws IOException, MalformedRecordException, InitializationException { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final DataType accountRecordType = RecordFieldType.STRING.getDataType(); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final List<RecordField> fieldsWrite = getDefaultFields(); + fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite); + + final JsonRecordReader readerService = new JsonRecordReader(schema); + final MockRecordWriter writerService = new CustomRecordWriter("header", false, schemaWrite); + + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ForkRecord.RECORD_READER, "reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true"); + runner.setProperty("my-path", "/accounts"); + + runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-array-strings.json").toPath()); + runner.run(1); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0); + mff.assertAttributeEquals("record.count", "0"); + } + + @Test + public void testForkExtractComplexWithParentFields() throws IOException, MalformedRecordException, InitializationException { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountWithTransactionSchema()); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final List<RecordField> fieldsWrite = getDefaultFields(); + fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + fieldsWrite.add(new RecordField("amount", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite); + + final JsonRecordReader readerService = new JsonRecordReader(schema); + final MockRecordWriter writerService = new CustomRecordWriter("header", false, schemaWrite); + + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ForkRecord.RECORD_READER, "reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true"); + runner.setProperty("my-path", "/accounts[*]/transactions"); + + runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-nested-array.json").toPath()); + runner.run(1); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0); + mff.assertAttributeEquals("record.count", "4"); + mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n" + + "7,36.78,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n"); + } + + @Test + public void testForkExtractComplexWithoutParentFields() throws IOException, MalformedRecordException, InitializationException { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountWithTransactionSchema()); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final List<RecordField> fieldsWrite = new ArrayList<RecordField>(); + fieldsWrite.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fieldsWrite.add(new RecordField("amount", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite); + + final JsonRecordReader readerService = new JsonRecordReader(schema); + final MockRecordWriter writerService = new CustomRecordWriter("header", false, schemaWrite); + + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ForkRecord.RECORD_READER, "reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "false"); + runner.setProperty("my-path", "/accounts[*]/transactions"); + + runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-nested-array.json").toPath()); + runner.run(1); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0); + mff.assertAttributeEquals("record.count", "4"); + mff.assertContentEquals("header\n5,150.31\n6,-15.31\n7,36.78\n8,-21.34\n"); + } + + @Test + public void testForkExtractComplexWithParentFieldsAndNull() throws IOException, MalformedRecordException, InitializationException { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountWithTransactionSchema()); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final List<RecordField> fieldsWrite = getDefaultFields(); + fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + fieldsWrite.add(new RecordField("amount", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite); + + final JsonRecordReader readerService = new JsonRecordReader(schema); + final MockRecordWriter writerService = new CustomRecordWriter("header", false, schemaWrite); + + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ForkRecord.RECORD_READER, "reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true"); + runner.setProperty("my-path", "/accounts[*]/transactions"); + + runner.enqueue(new File("src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json").toPath()); + runner.run(1); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0); + mff.assertAttributeEquals("record.count", "4"); + mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n" + + "7,36.78,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n"); + } + + @Test + public void testSplitMode() throws InitializationException, IOException { + String expectedOutput = null; + final TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("record-reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + runner.setProperty(ForkRecord.RECORD_READER, "record-reader"); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("record-writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer"); + + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_SPLIT); + + runner.setProperty("my-path", "/address"); + runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json")); + runner.run(); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/split-address.json"))); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5"); + + runner.clearTransferState(); + runner.setProperty("my-path", "/bankAccounts[*]/last5Transactions"); + runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json")); + runner.run(); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/split-transactions.json"))); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "6"); + } + + @Test + public void testExtractMode() throws InitializationException, IOException { + String expectedOutput = null; + final TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("record-reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/extract-schema.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + runner.setProperty(ForkRecord.RECORD_READER, "record-reader"); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("record-writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer"); + + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true"); + + runner.setProperty("my-path", "/bankAccounts[*]/last5Transactions"); + runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json")); + runner.run(); + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-transactions.json"))); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "6"); + } + + private class JsonRecordReader extends AbstractControllerService implements RecordReaderFactory { + + RecordSchema schema; + + public JsonRecordReader(RecordSchema schema) { + this.schema = schema; + } + + @Override + public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat); + } + + @Override + public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat); + } + + } + + private class CustomRecordWriter extends MockRecordWriter { + + RecordSchema schema; + + public CustomRecordWriter(final String header, final boolean quoteValues, RecordSchema schema) { + super(header, quoteValues); + this.schema = schema; + } + + @Override + public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + return this.schema; + } + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json.json new file mode 100644 index 0000000..303198a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json.json @@ -0,0 +1,99 @@ +[ + { + "id": 1, + "name": { + "last": "Doe", + "first": "John" + }, + "address": [], + "bankAccounts": null + }, { + "id": 2, + "name": { + "last": "Smith", + "first": "John" + }, + "address": [null], + "bankAccounts": null + }, { + "id": 3, + "name": { + "last": "Smith", + "first": "Jane" + }, + "address": [ + { + "id": "home", + "street": "1 nifi street", + "city": "nifi city" + }, { + "id": "work", + "street": "1 nifi avenue", + "city": "apache city" + } + ], + "bankAccounts": [ + { + "bankID": "nifi bank", + "IBAN": "myIBAN", + "last5Transactions": null + }, { + "bankID": "apache bank", + "IBAN": "myIBAN", + "last5Transactions": [ + { + "comment": "gas station", + "amount": "-45" + }, { + "comment": "hair cut", + "amount": "-19" + } + ] + } + ] + }, { + "id": 4, + "name": { + "last": "Clark", + "first": "Jane" + }, + "address": [ + { + "id": "home", + "street": "10 nifi street", + "city": "nifi city" + }, { + "id": "work", + "street": "10 nifi avenue", + "city": "apache city" + } + ], + "bankAccounts": [ + { + "bankID": "nifi bank", + "IBAN": "myIBAN", + "last5Transactions": [ + { + "comment": "gift", + "amount": "+100" + }, { + "comment": "flights", + "amount": "-190" + } + ] + }, { + "bankID": "apache bank", + "IBAN": "myIBAN", + "last5Transactions": [ + { + "comment": "nifi tshirt", + "amount": "0" + }, { + "comment": "theatre", + "amount": "-19" + } + ] + } + ] + } +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-transactions.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-transactions.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-transactions.json new file mode 100644 index 0000000..dcba8da --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-transactions.json @@ -0,0 +1,49 @@ +[ { + "comment" : "gas station", + "amount" : -45, + "id" : 3, + "name" : { + "last" : "Smith", + "first" : "Jane" + } +}, { + "comment" : "hair cut", + "amount" : -19, + "id" : 3, + "name" : { + "last" : "Smith", + "first" : "Jane" + } +}, { + "comment" : "gift", + "amount" : 100, + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + } +}, { + "comment" : "flights", + "amount" : -190, + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + } +}, { + "comment" : "nifi tshirt", + "amount" : 0, + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + } +}, { + "comment" : "theatre", + "amount" : -19, + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + } +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-address.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-address.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-address.json new file mode 100644 index 0000000..3c35c16 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-address.json @@ -0,0 +1,125 @@ +[ { + "id" : 2, + "name" : { + "last" : "Smith", + "first" : "John" + }, + "address" : [ null ], + "bankAccounts" : null +}, { + "id" : 3, + "name" : { + "last" : "Smith", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : null + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gas station", + "amount" : -45 + }, { + "comment" : "hair cut", + "amount" : -19 + } ] + } ] +}, { + "id" : 3, + "name" : { + "last" : "Smith", + "first" : "Jane" + }, + "address" : [ { + "id" : "work", + "street" : "1 nifi avenue", + "city" : "apache city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : null + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gas station", + "amount" : -45 + }, { + "comment" : "hair cut", + "amount" : -19 + } ] + } ] +}, { + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gift", + "amount" : 100 + }, { + "comment" : "flights", + "amount" : -190 + } ] + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "nifi tshirt", + "amount" : 0 + }, { + "comment" : "theatre", + "amount" : -19 + } ] + } ] +}, { + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "address" : [ { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gift", + "amount" : 100 + }, { + "comment" : "flights", + "amount" : -190 + } ] + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "nifi tshirt", + "amount" : 0 + }, { + "comment" : "theatre", + "amount" : -19 + } ] + } ] +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-transactions.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-transactions.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-transactions.json new file mode 100644 index 0000000..74cfccb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/split-transactions.json @@ -0,0 +1,181 @@ +[ { + "id" : 3, + "name" : { + "last" : "Smith", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "1 nifi avenue", + "city" : "apache city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : null + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gas station", + "amount" : -45 + } ] + } ] +}, { + "id" : 3, + "name" : { + "last" : "Smith", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "1 nifi avenue", + "city" : "apache city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : null + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "hair cut", + "amount" : -19 + } ] + } ] +}, { + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gift", + "amount" : 100 + } ] + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "nifi tshirt", + "amount" : 0 + }, { + "comment" : "theatre", + "amount" : -19 + } ] + } ] +}, { + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "flights", + "amount" : -190 + } ] + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "nifi tshirt", + "amount" : 0 + }, { + "comment" : "theatre", + "amount" : -19 + } ] + } ] +}, { + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "flights", + "amount" : -190 + } ] + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "nifi tshirt", + "amount" : 0 + } ] + } ] +}, { + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city" + } ], + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "flights", + "amount" : -190 + } ] + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "theatre", + "amount" : -19 + } ] + } ] +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/extract-schema.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/extract-schema.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/extract-schema.avsc new file mode 100644 index 0000000..39a3c04 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/extract-schema.avsc @@ -0,0 +1,21 @@ +{ + "name": "personWithNameRecord", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name" : "comment", "type": "string" }, + { "name" : "amount", "type": "long" }, + { "name": "id", "type": "int" }, + { "name": "name", "type": + { + "type": "record", + "name": "nameRecord", + "fields": + [ + { "name": "last", "type": "string" }, + { "name": "first", "type": "string" } + ] + } + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/schema.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/schema.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/schema.avsc new file mode 100644 index 0000000..5390973 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/schema/schema.avsc @@ -0,0 +1,56 @@ +{ + "name": "personWithNameRecord", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": + { + "type": "record", + "name": "nameRecord", + "fields": + [ + { "name": "last", "type": "string" }, + { "name": "first", "type": "string" } + ] + } + }, + { "name" : "address", "type": ["null", + { "type" : "array", "items" : { + "type" : "record", + "name" : "addressRecord", + "fields" : [ + { "name" : "id", "type": "string" }, + { "name" : "street", "type": "string" }, + { "name" : "city", "type": "string" } + ] + } + } + ] + }, + { "name" : "bankAccounts", "type": ["null", + { "type" : "array", "items" : { + "type" : "record", + "name" : "bankAccountRecord", + "fields" : [ + { "name" : "bankID", "type": "string" }, + { "name" : "IBAN", "type": "string" }, + { "name" : "last5Transactions", "type": ["null", + { "type" : "array", "items" : { + "type" : "record", + "name" : "transactionRecord", + "fields" : [ + { "name" : "comment", "type": "string" }, + { "name" : "amount", "type": "long" } + ] + } + } + ] + } + ] + } + } + ] + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array-strings.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array-strings.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array-strings.json new file mode 100644 index 0000000..3ecc536 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array-strings.json @@ -0,0 +1,10 @@ +{ + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [ "accountA", "accountB" ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array.json new file mode 100644 index 0000000..0bca3a0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-array.json @@ -0,0 +1,16 @@ +{ + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 42, + "balance": 4750.89 + }, { + "id": 43, + "balance": 48212.38 + }] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-nested-array.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-nested-array.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-nested-array.json new file mode 100644 index 0000000..eaa6e4f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/single-element-nested-nested-array.json @@ -0,0 +1,32 @@ +{ + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 42, + "balance": 4750.89, + "transactions": [{ + "id": 5, + "amount": 150.31 + }, + { + "id": 6, + "amount": -15.31 + }] + }, { + "id": 43, + "balance": 48212.38, + "transactions": [{ + "id": 7, + "amount": 36.78 + }, + { + "id": 8, + "amount": -21.34 + }] + }] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/be0ed704/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json new file mode 100644 index 0000000..89ade88 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json @@ -0,0 +1,50 @@ +[ + { + "id": 1, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 42, + "balance": 4750.89, + "transactions": [{ + "id": 5, + "amount": 150.31 + }, + { + "id": 6, + "amount": -15.31 + }] + }, { + "id": 43, + "balance": 48212.38, + "transactions": [{ + "id": 7, + "amount": 36.78 + }, + { + "id": 8, + "amount": -21.34 + }] + }] + }, { + "id": 2, + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA", + "accounts": [{ + "id": 42, + "balance": 4750.89 + }, { + "id": 43, + "balance": 48212.38, + "transactions": null + }] + } +] \ No newline at end of file
