[ 
https://issues.apache.org/jira/browse/NIFI-4227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16130734#comment-16130734
 ] 

ASF GitHub Bot commented on NIFI-4227:
--------------------------------------

Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2037#discussion_r133756360
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
 ---
    @@ -0,0 +1,293 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.record.path.validation.RecordPathValidator;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.ArrayDataType;
    +
    +@SideEffectFree
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"fork", "record", "content", "array", "stream", "event"})
    +@CapabilityDescription("This processor allows the user to fork a record 
into multiple records. The user must specify a RecordPath pointing "
    +        + "to a field of type ARRAY containing RECORD elements. The 
generated flow file will contain the records from the specified array. "
    +        + "It is also possible to add in each record all the fields of the 
parent records from the root level to the record element being "
    +        + "forked. However it supposes the fields to add are defined in 
the schema of the Record Writer controller service. See examples in "
    +        + "the additional details documentation of the processor.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "record.count", description = "The merged 
FlowFile will have a 'record.count' attribute indicating the number of records "
    +            + "that were written to the FlowFile."),
    +    @WritesAttribute(attribute = "mime.type", description = "The MIME Type 
indicated by the Record Writer"),
    +    @WritesAttribute(attribute = "<Attributes from Record Writer>", 
description = "Any Attribute that the configured Record Writer returns will be 
added to the FlowFile.")
    +})
    +@SeeAlso({QueryRecord.class, SplitRecord.class, PartitionRecord.class, 
ConvertRecord.class})
    +public class ForkRecord extends AbstractProcessor {
    +
    +    private volatile RecordPathCache recordPathCache = new 
RecordPathCache(25);
    +
    +    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for 
reading incoming data")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for 
writing out the records")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_PATH = new 
PropertyDescriptor.Builder()
    +            .name("record-path")
    +            .displayName("Record Path to Array")
    +            .description("A RecordPath that points to an array of records 
that will be forked.")
    +            .addValidator(new RecordPathValidator())
    +            .expressionLanguageSupported(true)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor INCLUDE_PARENT_FIELDS = new 
PropertyDescriptor.Builder()
    +            .name("include-parent-fields")
    +            .displayName("Include Parent Fields")
    +            .description("If set to true, all the fields from the root 
level to the given array will be added as fields of "
    +                    + "each element of the array to fork.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final Relationship REL_FORK = new Relationship.Builder()
    +            .name("fork")
    +            .description("The FlowFiles containing the forked records will 
be routed to this relationship")
    +            .build();
    +    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder()
    +            .name("original")
    +            .description("The original FlowFiles will be routed to this 
relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("In case a FlowFile generates an error during the 
fork operation, it will be routed to this relationship")
    +            .build();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(RECORD_PATH);
    +        properties.add(INCLUDE_PARENT_FIELDS);
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_ORIGINAL);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_FORK);
    +        return relationships;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String resultPathText = 
context.getProperty(RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final RecordPath resultRecordPath = 
recordPathCache.getCompiled(resultPathText);
    +
    +        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
    +        final boolean addParentFields = 
context.getProperty(INCLUDE_PARENT_FIELDS).asBoolean();
    +
    +        final FlowFile original = flowFile;
    +        final FlowFile outFlowFile = session.create(original);
    +        final AtomicInteger readCount = new AtomicInteger(0);
    +        final AtomicInteger writeCount = new AtomicInteger(0);
    +
    +        try {
    +
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(final InputStream in) throws 
IOException {
    +                    try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
    +
    +                        final RecordSchema writeSchema = 
writerFactory.getSchema(original, reader.getSchema());
    +                        final OutputStream out = 
session.write(outFlowFile);
    +
    +                        try (final RecordSetWriter recordSetWriter = 
writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
    +
    +                            recordSetWriter.beginRecordSet();
    +
    +                            // we read each record of the input flow file
    +                            Record record;
    +                            while ((record = reader.nextRecord()) != null) 
{
    +
    +                                readCount.incrementAndGet();
    +
    +                                // evaluate record path in each record of 
the flow file
    +                                Iterator<FieldValue> it = 
resultRecordPath.evaluate(record).getSelectedFields().iterator();
    +
    +                                while(it.hasNext()) {
    +                                    FieldValue fieldValue = it.next();
    +                                    RecordFieldType fieldType = 
fieldValue.getField().getDataType().getFieldType();
    +
    +                                    // we want to have an array here, 
nothing else allowed
    +                                    if(fieldType != RecordFieldType.ARRAY) 
{
    +                                        getLogger().debug("The record path 
" + resultPathText + " is matching a field "
    --- End diff --
    
    In a previous version of this PR, I was throwing an exception and routing 
to failure. IIRC I changed to this approach in case the pointed array can be 
empty in a record. In that case, I don't want to fail the whole flow file and 
go to the next record. I can make this configurable with an additional property 
though. Does it sound better to you?


> Create a ForkRecord processor
> -----------------------------
>
>                 Key: NIFI-4227
>                 URL: https://issues.apache.org/jira/browse/NIFI-4227
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Pierre Villard
>            Assignee: Pierre Villard
>         Attachments: TestForkRecord.xml
>
>
> I'd like a way to fork a record containing an array of records into multiple 
> records, each one being an element of the array. In addition, if configured 
> to, I'd like the option to add to each new record the parent fields.
> For example, if I've:
> {noformat}
> [{
>       "id": 1,
>       "name": "John Doe",
>       "address": "123 My Street",
>       "city": "My City", 
>       "state": "MS",
>       "zipCode": "11111",
>       "country": "USA",
>       "accounts": [{
>               "id": 42,
>               "balance": 4750.89
>       }, {
>               "id": 43,
>               "balance": 48212.38
>       }]
> }, 
> {
>       "id": 2,
>       "name": "Jane Doe",
>       "address": "345 My Street",
>       "city": "Her City", 
>       "state": "NY",
>       "zipCode": "22222",
>       "country": "USA",
>       "accounts": [{
>               "id": 45,
>               "balance": 6578.45
>       }, {
>               "id": 46,
>               "balance": 34567.21
>       }]
> }]
> {noformat}
> Then, I want to generate records looking like:
> {noformat}
> [{
>       "id": 42,
>       "balance": 4750.89
> }, {
>       "id": 43,
>       "balance": 48212.38
> }, {
>       "id": 45,
>       "balance": 6578.45
> }, {
>       "id": 46,
>       "balance": 34567.21
> }]
> {noformat}
> Or, if parent fields are included, looking like:
> {noformat}
> [{
>       "name": "John Doe",
>       "address": "123 My Street",
>       "city": "My City", 
>       "state": "MS",
>       "zipCode": "11111",
>       "country": "USA",
>       "id": 42,
>       "balance": 4750.89
> }, {
>       "name": "John Doe",
>       "address": "123 My Street",
>       "city": "My City", 
>       "state": "MS",
>       "zipCode": "11111",
>       "country": "USA",
>       "id": 43,
>       "balance": 48212.38
> }, {
>       "name": "Jane Doe",
>       "address": "345 My Street",
>       "city": "Her City", 
>       "state": "NY",
>       "zipCode": "22222",
>       "country": "USA",
>       "id": 45,
>       "balance": 6578.45
> }, {
>       "name": "Jane Doe",
>       "address": "345 My Street",
>       "city": "Her City", 
>       "state": "NY",
>       "zipCode": "22222",
>       "country": "USA",
>       "id": 46,
>       "balance": 34567.21
> }]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to