Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2037#discussion_r133756360
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
 ---
    @@ -0,0 +1,293 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.record.path.validation.RecordPathValidator;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.ArrayDataType;
    +
    +@SideEffectFree
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"fork", "record", "content", "array", "stream", "event"})
    +@CapabilityDescription("This processor allows the user to fork a record 
into multiple records. The user must specify a RecordPath pointing "
    +        + "to a field of type ARRAY containing RECORD elements. The 
generated flow file will contain the records from the specified array. "
    +        + "It is also possible to add in each record all the fields of the 
parent records from the root level to the record element being "
    +        + "forked. However it supposes the fields to add are defined in 
the schema of the Record Writer controller service. See examples in "
    +        + "the additional details documentation of the processor.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "record.count", description = "The merged 
FlowFile will have a 'record.count' attribute indicating the number of records "
    +            + "that were written to the FlowFile."),
    +    @WritesAttribute(attribute = "mime.type", description = "The MIME Type 
indicated by the Record Writer"),
    +    @WritesAttribute(attribute = "<Attributes from Record Writer>", 
description = "Any Attribute that the configured Record Writer returns will be 
added to the FlowFile.")
    +})
    +@SeeAlso({QueryRecord.class, SplitRecord.class, PartitionRecord.class, 
ConvertRecord.class})
    +public class ForkRecord extends AbstractProcessor {
    +
    +    private volatile RecordPathCache recordPathCache = new 
RecordPathCache(25);
    +
    +    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for 
reading incoming data")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for 
writing out the records")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_PATH = new 
PropertyDescriptor.Builder()
    +            .name("record-path")
    +            .displayName("Record Path to Array")
    +            .description("A RecordPath that points to an array of records 
that will be forked.")
    +            .addValidator(new RecordPathValidator())
    +            .expressionLanguageSupported(true)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor INCLUDE_PARENT_FIELDS = new 
PropertyDescriptor.Builder()
    +            .name("include-parent-fields")
    +            .displayName("Include Parent Fields")
    +            .description("If set to true, all the fields from the root 
level to the given array will be added as fields of "
    +                    + "each element of the array to fork.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final Relationship REL_FORK = new Relationship.Builder()
    +            .name("fork")
    +            .description("The FlowFiles containing the forked records will 
be routed to this relationship")
    +            .build();
    +    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder()
    +            .name("original")
    +            .description("The original FlowFiles will be routed to this 
relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("In case a FlowFile generates an error during the 
fork operation, it will be routed to this relationship")
    +            .build();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(RECORD_READER);
    +        properties.add(RECORD_WRITER);
    +        properties.add(RECORD_PATH);
    +        properties.add(INCLUDE_PARENT_FIELDS);
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_ORIGINAL);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_FORK);
    +        return relationships;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String resultPathText = 
context.getProperty(RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
    +        final RecordPath resultRecordPath = 
recordPathCache.getCompiled(resultPathText);
    +
    +        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
    +        final boolean addParentFields = 
context.getProperty(INCLUDE_PARENT_FIELDS).asBoolean();
    +
    +        final FlowFile original = flowFile;
    +        final FlowFile outFlowFile = session.create(original);
    +        final AtomicInteger readCount = new AtomicInteger(0);
    +        final AtomicInteger writeCount = new AtomicInteger(0);
    +
    +        try {
    +
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(final InputStream in) throws 
IOException {
    +                    try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
    +
    +                        final RecordSchema writeSchema = 
writerFactory.getSchema(original, reader.getSchema());
    +                        final OutputStream out = 
session.write(outFlowFile);
    +
    +                        try (final RecordSetWriter recordSetWriter = 
writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
    +
    +                            recordSetWriter.beginRecordSet();
    +
    +                            // we read each record of the input flow file
    +                            Record record;
    +                            while ((record = reader.nextRecord()) != null) 
{
    +
    +                                readCount.incrementAndGet();
    +
    +                                // evaluate record path in each record of 
the flow file
    +                                Iterator<FieldValue> it = 
resultRecordPath.evaluate(record).getSelectedFields().iterator();
    +
    +                                while(it.hasNext()) {
    +                                    FieldValue fieldValue = it.next();
    +                                    RecordFieldType fieldType = 
fieldValue.getField().getDataType().getFieldType();
    +
    +                                    // we want to have an array here, 
nothing else allowed
    +                                    if(fieldType != RecordFieldType.ARRAY) 
{
    +                                        getLogger().debug("The record path 
" + resultPathText + " is matching a field "
    --- End diff --
    
    In a previous version of this PR, I was throwing an exception and routing 
to failure. IIRC I changed to this approach in case the pointed array can be 
empty in a record. In that case, I don't want to fail the whole flow file and 
go to the next record. I can make this configurable with an additional property 
though. Does it sound better to you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to