Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2037#discussion_r133756360
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"fork", "record", "content", "array", "stream", "event"})
+@CapabilityDescription("This processor allows the user to fork a record
into multiple records. The user must specify a RecordPath pointing "
+ + "to a field of type ARRAY containing RECORD elements. The
generated flow file will contain the records from the specified array. "
+ + "It is also possible to add in each record all the fields of the
parent records from the root level to the record element being "
+ + "forked. However it supposes the fields to add are defined in
the schema of the Record Writer controller service. See examples in "
+ + "the additional details documentation of the processor.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "record.count", description = "The merged
FlowFile will have a 'record.count' attribute indicating the number of records "
+ + "that were written to the FlowFile."),
+ @WritesAttribute(attribute = "mime.type", description = "The MIME Type
indicated by the Record Writer"),
+ @WritesAttribute(attribute = "<Attributes from Record Writer>",
description = "Any Attribute that the configured Record Writer returns will be
added to the FlowFile.")
+})
+@SeeAlso({QueryRecord.class, SplitRecord.class, PartitionRecord.class,
ConvertRecord.class})
+public class ForkRecord extends AbstractProcessor {
+
+ private volatile RecordPathCache recordPathCache = new
RecordPathCache(25);
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
reading incoming data")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for
writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECORD_PATH = new
PropertyDescriptor.Builder()
+ .name("record-path")
+ .displayName("Record Path to Array")
+ .description("A RecordPath that points to an array of records
that will be forked.")
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor INCLUDE_PARENT_FIELDS = new
PropertyDescriptor.Builder()
+ .name("include-parent-fields")
+ .displayName("Include Parent Fields")
+ .description("If set to true, all the fields from the root
level to the given array will be added as fields of "
+ + "each element of the array to fork.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final Relationship REL_FORK = new Relationship.Builder()
+ .name("fork")
+ .description("The FlowFiles containing the forked records will
be routed to this relationship")
+ .build();
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder()
+ .name("original")
+ .description("The original FlowFiles will be routed to this
relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("In case a FlowFile generates an error during the
fork operation, it will be routed to this relationship")
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER);
+ properties.add(RECORD_WRITER);
+ properties.add(RECORD_PATH);
+ properties.add(INCLUDE_PARENT_FIELDS);
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_FORK);
+ return relationships;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String resultPathText =
context.getProperty(RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath resultRecordPath =
recordPathCache.getCompiled(resultPathText);
+
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final boolean addParentFields =
context.getProperty(INCLUDE_PARENT_FIELDS).asBoolean();
+
+ final FlowFile original = flowFile;
+ final FlowFile outFlowFile = session.create(original);
+ final AtomicInteger readCount = new AtomicInteger(0);
+ final AtomicInteger writeCount = new AtomicInteger(0);
+
+ try {
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws
IOException {
+ try (final RecordReader reader =
readerFactory.createRecordReader(original, in, getLogger())) {
+
+ final RecordSchema writeSchema =
writerFactory.getSchema(original, reader.getSchema());
+ final OutputStream out =
session.write(outFlowFile);
+
+ try (final RecordSetWriter recordSetWriter =
writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
+
+ recordSetWriter.beginRecordSet();
+
+ // we read each record of the input flow file
+ Record record;
+ while ((record = reader.nextRecord()) != null)
{
+
+ readCount.incrementAndGet();
+
+ // evaluate record path in each record of
the flow file
+ Iterator<FieldValue> it =
resultRecordPath.evaluate(record).getSelectedFields().iterator();
+
+ while(it.hasNext()) {
+ FieldValue fieldValue = it.next();
+ RecordFieldType fieldType =
fieldValue.getField().getDataType().getFieldType();
+
+ // we want to have an array here,
nothing else allowed
+ if(fieldType != RecordFieldType.ARRAY)
{
+ getLogger().debug("The record path
" + resultPathText + " is matching a field "
--- End diff --
In a previous version of this PR, I was throwing an exception and routing
to failure. IIRC I changed to this approach in case the pointed array can be
empty in a record. In that case, I don't want to fail the whole flow file and
go to the next record. I can make this configurable with an additional property
though. Does it sound better to you?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---