markap14 commented on a change in pull request #4948: URL: https://github.com/apache/nifi/pull/4948#discussion_r676859358
########## File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.Restriction; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.RequiredPermission; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.PushBackRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@EventDriven +@SideEffectFree +@Tags({"record", "partition", "script", "groovy", "jython", "python", "segment", "split", "group", "organize"}) +@CapabilityDescription("Receives Record-oriented data (i.e., data that can be read by the configured Record Reader) and evaluates the user provided script against " + + "each record in the incoming flow file. Each record is then grouped with other records sharing the same partition and a FlowFile is created for each groups of records. " + + "Two records shares the same partition if the evaluation of the script results the same return value for both. Those will be considered as part of the same partition.") +@Restricted(restrictions = { + @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE, + explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") +}) +@WritesAttributes({ + @WritesAttribute(attribute = "partition", description = "The partition of the outgoing flow file."), + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), + @WritesAttribute(attribute = "record.count", description = "The number of records within the flow file."), + @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer."), + @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the partitioned FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", description = "The number of partitioned FlowFiles generated from the parent FlowFile") +}) +@SeeAlso(classNames = { + "org.apache.nifi.processors.script.ScriptedTransformRecord", + "org.apache.nifi.processors.script.ScriptedRouteRecord", + "org.apache.nifi.processors.script.ScriptedValidateRecord", + "org.apache.nifi.processors.script.ScriptedFilterRecord" +}) +public class ScriptedPartitionRecord extends ScriptedRecordProcessor { + + static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are successfully partitioned will be routed to this relationship") + .build(); + static final Relationship RELATIONSHIP_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Once all records in an incoming FlowFile have been partitioned, the original FlowFile is routed to this relationship.") + .build(); + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be partitioned from the configured input format to the configured output format, " + + "the unchanged FlowFile will be routed to this relationship") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = new HashSet<>(); + + static { + RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL); + RELATIONSHIPS.add(RELATIONSHIP_SUCCESS); + RELATIONSHIPS.add(RELATIONSHIP_FAILURE); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ScriptRunner scriptRunner = pollScriptRunner(); + if (scriptRunner == null) { + // This shouldn't happen. But just in case. + session.rollback(); + return; + } + + boolean success = false; + + try { + final ScriptEvaluator evaluator; + + try { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + evaluator = createEvaluator(scriptEngine, flowFile); + } catch (final ScriptException se) { + getLogger().error("Failed to initialize script engine", se); + session.transfer(flowFile, RELATIONSHIP_FAILURE); + return; + } + + success = partition(context, session, flowFile, evaluator); + } finally { + offerScriptRunner(scriptRunner); + } + + session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : RELATIONSHIP_FAILURE); + } + + private boolean partition( + final ProcessContext context, + final ProcessSession session, + final FlowFile incomingFlowFile, + final ScriptEvaluator evaluator + ) { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final Map<String, String> originalAttributes = incomingFlowFile.getAttributes(); + + try { + session.read(incomingFlowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try ( + final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), getLogger()) + ) { + final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema()); + final RecordSet recordSet = reader.createRecordSet(); + final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet); + + final Map<String, FlowFile> outgoingFlowFiles = new HashMap<>(); + final Map<String, RecordSetWriter> recordSetWriters = new HashMap<>(); + + int index = 0; + + // Reading in records and evaluate script + while (pushBackSet.isAnotherRecord()) { + final Record record = pushBackSet.next(); + final Object evaluatedValue = evaluator.evaluate(record, index++); + getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", record, index - 1, evaluatedValue); + + if (evaluatedValue != null && evaluatedValue instanceof String) { + final String partition = (String) evaluatedValue; + + if (!outgoingFlowFiles.containsKey(partition)) { Review comment: Can make this more efficient and arguably cleaner - instead of check if it contains key and if not, populating it, followed by calling HashMap.get(), could use the approach: ``` RecordSetWriter writer = recordSetWriters.get(partition); if (writer == null) { writer = ... // populate maps } writer.write(record); ``` Performing the extra HashMap.get() may seem negligible in terms of performance, but HashMap.get can be pretty expensive, as it has to calculate its own version of a hash code for every get(). So even though String.hashCode() just returns a value, HashMap.get() will re-hash that value, which is fairly expensive. And when you consider a single FlowFile may well have millions of Records, that can amount to adding a few seconds to the processing of a FlowFile. ########## File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.Restriction; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.RequiredPermission; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.PushBackRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@EventDriven +@SideEffectFree +@Tags({"record", "partition", "script", "groovy", "jython", "python", "segment", "split", "group", "organize"}) +@CapabilityDescription("Receives Record-oriented data (i.e., data that can be read by the configured Record Reader) and evaluates the user provided script against " + + "each record in the incoming flow file. Each record is then grouped with other records sharing the same partition and a FlowFile is created for each groups of records. " + + "Two records shares the same partition if the evaluation of the script results the same return value for both. Those will be considered as part of the same partition.") +@Restricted(restrictions = { + @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE, + explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") +}) +@WritesAttributes({ + @WritesAttribute(attribute = "partition", description = "The partition of the outgoing flow file."), + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), + @WritesAttribute(attribute = "record.count", description = "The number of records within the flow file."), + @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer."), + @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the partitioned FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", description = "The number of partitioned FlowFiles generated from the parent FlowFile") +}) +@SeeAlso(classNames = { + "org.apache.nifi.processors.script.ScriptedTransformRecord", + "org.apache.nifi.processors.script.ScriptedRouteRecord", + "org.apache.nifi.processors.script.ScriptedValidateRecord", + "org.apache.nifi.processors.script.ScriptedFilterRecord" +}) +public class ScriptedPartitionRecord extends ScriptedRecordProcessor { + + static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are successfully partitioned will be routed to this relationship") + .build(); + static final Relationship RELATIONSHIP_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Once all records in an incoming FlowFile have been partitioned, the original FlowFile is routed to this relationship.") + .build(); + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be partitioned from the configured input format to the configured output format, " + + "the unchanged FlowFile will be routed to this relationship") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = new HashSet<>(); + + static { + RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL); + RELATIONSHIPS.add(RELATIONSHIP_SUCCESS); + RELATIONSHIPS.add(RELATIONSHIP_FAILURE); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ScriptRunner scriptRunner = pollScriptRunner(); + if (scriptRunner == null) { + // This shouldn't happen. But just in case. + session.rollback(); + return; + } + + boolean success = false; + + try { + final ScriptEvaluator evaluator; + + try { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + evaluator = createEvaluator(scriptEngine, flowFile); + } catch (final ScriptException se) { + getLogger().error("Failed to initialize script engine", se); + session.transfer(flowFile, RELATIONSHIP_FAILURE); + return; + } + + success = partition(context, session, flowFile, evaluator); + } finally { + offerScriptRunner(scriptRunner); + } + + session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : RELATIONSHIP_FAILURE); + } + + private boolean partition( + final ProcessContext context, + final ProcessSession session, + final FlowFile incomingFlowFile, + final ScriptEvaluator evaluator + ) { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final Map<String, String> originalAttributes = incomingFlowFile.getAttributes(); + + try { + session.read(incomingFlowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try ( + final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), getLogger()) + ) { + final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema()); + final RecordSet recordSet = reader.createRecordSet(); + final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet); + + final Map<String, FlowFile> outgoingFlowFiles = new HashMap<>(); + final Map<String, RecordSetWriter> recordSetWriters = new HashMap<>(); + + int index = 0; + + // Reading in records and evaluate script + while (pushBackSet.isAnotherRecord()) { + final Record record = pushBackSet.next(); + final Object evaluatedValue = evaluator.evaluate(record, index++); + getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", record, index - 1, evaluatedValue); + + if (evaluatedValue != null && evaluatedValue instanceof String) { + final String partition = (String) evaluatedValue; + + if (!outgoingFlowFiles.containsKey(partition)) { + final FlowFile outgoingFlowFile = session.create(incomingFlowFile); + final OutputStream out = session.write(outgoingFlowFile); + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, outgoingFlowFile); + + writer.beginRecordSet(); + outgoingFlowFiles.put(partition, outgoingFlowFile); + recordSetWriters.put(partition, writer); + } + + recordSetWriters.get(partition).write(record); + } else { + throw new ProcessException("Script returned a value of " + evaluatedValue + + " but this Processor requires that the object returned be an instance of String"); + } + } + + // Sending outgoing flow files + int fragmentIndex = 0; + + for (final String partition : outgoingFlowFiles.keySet()) { + final RecordSetWriter writer = recordSetWriters.get(partition); + final FlowFile outgoingFlowFile = outgoingFlowFiles.get(partition); + + final Map<String, String> attributes = new HashMap<>(incomingFlowFile.getAttributes()); + attributes.put("mime.type", writer.getMimeType()); + attributes.put("partition", partition); + attributes.put("fragment.index", String.valueOf(fragmentIndex)); + attributes.put("fragment.count", String.valueOf(outgoingFlowFiles.size())); + + try { + final WriteResult finalResult = writer.finishRecordSet(); + final int outgoingFlowFileRecords = finalResult.getRecordCount(); + attributes.put("record.count", String.valueOf(outgoingFlowFileRecords)); + writer.close(); + } catch (final IOException e) { + throw new ProcessException("Resources used for record writing might not be closed", e); + } + + session.putAllAttributes(outgoingFlowFile, attributes); + session.transfer(outgoingFlowFile, RELATIONSHIP_SUCCESS); + fragmentIndex++; + } + + session.adjustCounter("Record Processed", index, false); + } catch (final ScriptException | SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Failed to parse incoming FlowFile", e); + } + } + }); + + return true; + } catch (final Exception e) { + getLogger().error("Error during routing records", e); Review comment: `getLogger().error("Failed to route records for {}", flowFile, e);` perhaps? Is good to include the FlowFile in the arguments to the logger. There's a Jira already to add the ability to capture the FlowFile from the arguments in order to do something with them for bulletins, etc. Plus it just makes it easier to track through the logs. ########## File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouteRecord.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script; + +import org.apache.nifi.annotation.behavior.DynamicRelationship; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +@Tags({"record", "routing", "script", "groovy", "jython", "python", "segment", "split", "group", "organize"}) +@CapabilityDescription( + "This processor provides the ability to route the records of the incoming FlowFile using an user-provided script. " + + "The script is expected to handle a record as argument and return with a string value. " + + "The returned value defines a route. All routes are bounded to an outgoing relationship where the record will be transferred to. " + + "Relationships are defined as dynamic properties: dynamic property names are serving as the name of the route. " + + "The value of a dynamic property defines the relationship the given record will be routed into. Multiple routes might point to the same relationship. " + + "Creation of these dynamic relationship is managed by the processor. " + + "The records, which for the script returned with an unknown relationship name are routed to the \"unmatched\" relationship. " + + "The records are batched: for an incoming FlowFile, all the records routed towards a given relationship are batched into one single FlowFile." +) +@SeeAlso(classNames = { + "org.apache.nifi.processors.script.ScriptedTransformRecord", + "org.apache.nifi.processors.script.ScriptedPartitionRecord", + "org.apache.nifi.processors.script.ScriptedValidateRecord", + "org.apache.nifi.processors.script.ScriptedFilterRecord" +}) +@DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's Attribute Expression Language") +public class ScriptedRouteRecord extends ScriptedRouterProcessor<String> { Review comment: @simonbence my concern there is that many users don't read documentation very carefully. Especially when it gets really verbose. I think in general, when considering usability there's a bit of a scale that goes from "Easy to use" to "More Powerful." It's not always the case - you can have things that are both super powerful and super easy to use. But for quite a while, when such a dynamic exists, NiFi has leaned too much toward "More Powerful" I think. I think we need to lean more toward "Easy to use." This is the reason that I would lean toward not including the processor. I would avoid adding things that are likely to introduce confusion or are generally not consistent with the way the rest of the processors work - unless there's a really good reason to "buck the trend." Here, I think there is a reason. But I don't think it's a "really good" reason. ########## File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.Restriction; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.RequiredPermission; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.PushBackRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@EventDriven +@SupportsBatching +@SideEffectFree +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Restricted(restrictions = { + @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE, + explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") +}) +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), + @WritesAttribute(attribute = "record.count", description = "The number of records within the flow file."), + @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.") +}) +public abstract class ScriptedRouterProcessor<T> extends ScriptedRecordProcessor { + private final Class<T> scriptResultType; + + /** + * @param scriptResultType Defines the expected result type of the user-provided script. + */ + protected ScriptedRouterProcessor(final Class<T> scriptResultType) { + this.scriptResultType = scriptResultType; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ScriptRunner scriptRunner = pollScriptRunner(); + if (scriptRunner == null) { + // This shouldn't happen. But just in case. + session.rollback(); + return; + } + + boolean success = false; + + try { + final ScriptEvaluator evaluator; + + try { + final ScriptEngine scriptEngine = scriptRunner.getScriptEngine(); + evaluator = createEvaluator(scriptEngine, flowFile); + } catch (final ScriptException se) { + getLogger().error("Failed to initialize script engine", se); + session.transfer(flowFile, getFailureRelationship()); + return; + } + + success = route(context, session, flowFile, evaluator); + } finally { + offerScriptRunner(scriptRunner); + } + + session.transfer(flowFile, success ? getOriginalRelationship() : getFailureRelationship()); + } + + private boolean route( + final ProcessContext context, + final ProcessSession session, + final FlowFile incomingFlowFile, + final ScriptEvaluator evaluator + ) { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final Map<String, String> originalAttributes = incomingFlowFile.getAttributes(); + + try { + session.read(incomingFlowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try ( + final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), getLogger()) + ) { + final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema()); + final RecordSet recordSet = reader.createRecordSet(); + final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet); + final Map<Relationship, FlowFile> outgoingFlowFiles = new HashMap<>(); + final Map<Relationship, RecordSetWriter> recordSetWriters = new HashMap<>(); + + int index = 0; + + // Reading in records and evaluate script + while (pushBackSet.isAnotherRecord()) { + final Record record = pushBackSet.next(); + final Object evaluatedValue = evaluator.evaluate(record, index++); + getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", record, index - 1, evaluatedValue); + + if (evaluatedValue != null && scriptResultType.isInstance(evaluatedValue)) { + final Optional<Relationship> outgoingRelationship = resolveRelationship(scriptResultType.cast(evaluatedValue)); + + if (outgoingRelationship.isPresent()) { + if (!outgoingFlowFiles.containsKey(outgoingRelationship.get())) { Review comment: Same comment as above, checking `containsKey` vs. just `get` and check for `null`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
