[
https://issues.apache.org/jira/browse/NIFI-4227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16130734#comment-16130734
]
ASF GitHub Bot commented on NIFI-4227:
--------------------------------------
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2037#discussion_r133756360
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"fork", "record", "content", "array", "stream", "event"})
+@CapabilityDescription("This processor allows the user to fork a record
into multiple records. The user must specify a RecordPath pointing "
+ + "to a field of type ARRAY containing RECORD elements. The
generated flow file will contain the records from the specified array. "
+ + "It is also possible to add in each record all the fields of the
parent records from the root level to the record element being "
+ + "forked. However it supposes the fields to add are defined in
the schema of the Record Writer controller service. See examples in "
+ + "the additional details documentation of the processor.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "record.count", description = "The merged
FlowFile will have a 'record.count' attribute indicating the number of records "
+ + "that were written to the FlowFile."),
+ @WritesAttribute(attribute = "mime.type", description = "The MIME Type
indicated by the Record Writer"),
+ @WritesAttribute(attribute = "<Attributes from Record Writer>",
description = "Any Attribute that the configured Record Writer returns will be
added to the FlowFile.")
+})
+@SeeAlso({QueryRecord.class, SplitRecord.class, PartitionRecord.class,
ConvertRecord.class})
+public class ForkRecord extends AbstractProcessor {
+
+ private volatile RecordPathCache recordPathCache = new
RecordPathCache(25);
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
reading incoming data")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for
writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECORD_PATH = new
PropertyDescriptor.Builder()
+ .name("record-path")
+ .displayName("Record Path to Array")
+ .description("A RecordPath that points to an array of records
that will be forked.")
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor INCLUDE_PARENT_FIELDS = new
PropertyDescriptor.Builder()
+ .name("include-parent-fields")
+ .displayName("Include Parent Fields")
+ .description("If set to true, all the fields from the root
level to the given array will be added as fields of "
+ + "each element of the array to fork.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final Relationship REL_FORK = new Relationship.Builder()
+ .name("fork")
+ .description("The FlowFiles containing the forked records will
be routed to this relationship")
+ .build();
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder()
+ .name("original")
+ .description("The original FlowFiles will be routed to this
relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("In case a FlowFile generates an error during the
fork operation, it will be routed to this relationship")
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER);
+ properties.add(RECORD_WRITER);
+ properties.add(RECORD_PATH);
+ properties.add(INCLUDE_PARENT_FIELDS);
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_FORK);
+ return relationships;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String resultPathText =
context.getProperty(RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath resultRecordPath =
recordPathCache.getCompiled(resultPathText);
+
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final boolean addParentFields =
context.getProperty(INCLUDE_PARENT_FIELDS).asBoolean();
+
+ final FlowFile original = flowFile;
+ final FlowFile outFlowFile = session.create(original);
+ final AtomicInteger readCount = new AtomicInteger(0);
+ final AtomicInteger writeCount = new AtomicInteger(0);
+
+ try {
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws
IOException {
+ try (final RecordReader reader =
readerFactory.createRecordReader(original, in, getLogger())) {
+
+ final RecordSchema writeSchema =
writerFactory.getSchema(original, reader.getSchema());
+ final OutputStream out =
session.write(outFlowFile);
+
+ try (final RecordSetWriter recordSetWriter =
writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
+
+ recordSetWriter.beginRecordSet();
+
+ // we read each record of the input flow file
+ Record record;
+ while ((record = reader.nextRecord()) != null)
{
+
+ readCount.incrementAndGet();
+
+ // evaluate record path in each record of
the flow file
+ Iterator<FieldValue> it =
resultRecordPath.evaluate(record).getSelectedFields().iterator();
+
+ while(it.hasNext()) {
+ FieldValue fieldValue = it.next();
+ RecordFieldType fieldType =
fieldValue.getField().getDataType().getFieldType();
+
+ // we want to have an array here,
nothing else allowed
+ if(fieldType != RecordFieldType.ARRAY)
{
+ getLogger().debug("The record path
" + resultPathText + " is matching a field "
--- End diff --
In a previous version of this PR, I was throwing an exception and routing
to failure. IIRC I changed to this approach in case the pointed array can be
empty in a record. In that case, I don't want to fail the whole flow file and
go to the next record. I can make this configurable with an additional property
though. Does it sound better to you?
> Create a ForkRecord processor
> -----------------------------
>
> Key: NIFI-4227
> URL: https://issues.apache.org/jira/browse/NIFI-4227
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Pierre Villard
> Assignee: Pierre Villard
> Attachments: TestForkRecord.xml
>
>
> I'd like a way to fork a record containing an array of records into multiple
> records, each one being an element of the array. In addition, if configured
> to, I'd like the option to add to each new record the parent fields.
> For example, if I've:
> {noformat}
> [{
> "id": 1,
> "name": "John Doe",
> "address": "123 My Street",
> "city": "My City",
> "state": "MS",
> "zipCode": "11111",
> "country": "USA",
> "accounts": [{
> "id": 42,
> "balance": 4750.89
> }, {
> "id": 43,
> "balance": 48212.38
> }]
> },
> {
> "id": 2,
> "name": "Jane Doe",
> "address": "345 My Street",
> "city": "Her City",
> "state": "NY",
> "zipCode": "22222",
> "country": "USA",
> "accounts": [{
> "id": 45,
> "balance": 6578.45
> }, {
> "id": 46,
> "balance": 34567.21
> }]
> }]
> {noformat}
> Then, I want to generate records looking like:
> {noformat}
> [{
> "id": 42,
> "balance": 4750.89
> }, {
> "id": 43,
> "balance": 48212.38
> }, {
> "id": 45,
> "balance": 6578.45
> }, {
> "id": 46,
> "balance": 34567.21
> }]
> {noformat}
> Or, if parent fields are included, looking like:
> {noformat}
> [{
> "name": "John Doe",
> "address": "123 My Street",
> "city": "My City",
> "state": "MS",
> "zipCode": "11111",
> "country": "USA",
> "id": 42,
> "balance": 4750.89
> }, {
> "name": "John Doe",
> "address": "123 My Street",
> "city": "My City",
> "state": "MS",
> "zipCode": "11111",
> "country": "USA",
> "id": 43,
> "balance": 48212.38
> }, {
> "name": "Jane Doe",
> "address": "345 My Street",
> "city": "Her City",
> "state": "NY",
> "zipCode": "22222",
> "country": "USA",
> "id": 45,
> "balance": 6578.45
> }, {
> "name": "Jane Doe",
> "address": "345 My Street",
> "city": "Her City",
> "state": "NY",
> "zipCode": "22222",
> "country": "USA",
> "id": 46,
> "balance": 34567.21
> }]
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)