markap14 commented on a change in pull request #4948:
URL: https://github.com/apache/nifi/pull/4948#discussion_r666523972



##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedFilterRecord.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+@Tags({"record", "filter", "script", "groovy", "jython", "python"})
+@CapabilityDescription(
+    "This processor provides the ability to filter records out from FlowFiles 
using the user-provided script. " +
+    "Every record will be evaluated by the script which must return with a 
boolean value. " +
+    "Records with \"true\" result will be routed to the \"matching\" 
relationship in a batch. " +
+    "Other records will be filtered out."
+)
+@SeeAlso(classNames = {
+    "org.apache.nifi.processors.script.ScriptedTransformRecord",
+    "org.apache.nifi.processors.script.ScriptedValidateRecord",
+    "org.apache.nifi.processors.script.ScriptedRouteRecord",
+    "org.apache.nifi.processors.script.ScriptedPartitionRecord"
+})
+public class ScriptedFilterRecord extends ScriptedRouterProcessor<Boolean> {
+    static final Relationship RELATIONSHIP_MATCHING = new 
Relationship.Builder()
+            .name("matching")
+            .description(
+                "Matching records of the original FlowFile will be routed to 
this relationship. " +
+                "If there are no matching records, no FlowFile will be routed 
here."
+            )
+            .build();
+
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description(
+                "After successful procession, the incoming FlowFile will be 
transferred to this relationship. " +
+                "This happens regardless the number of filtered or remaining 
records.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILED = new Relationship.Builder()
+            .name("failed")

Review comment:
       Should use 'failure' instead of 'failed' to adhere to the standard 
naming convensions.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the 
user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailureRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : 
getFailureRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<Relationship, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();

Review comment:
       Again, I think the abstraction of this 
RecordBatchingProcessorFlowFileBuilder should be eliminated. All that's really 
needed is the RecordSetWriter.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+                return;
+            }
+
+            success = partition(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : 
RELATIONSHIP_FAILURE);
+    }
+
+    private boolean partition(
+            final ProcessContext context,
+            final ProcessSession session,
+            final FlowFile incomingFlowFile,
+            final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<String, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = 
evaluator.evaluate(record, index++);
+                            getLogger().debug("Evaluated scripted against {} 
(index {}), producing result of {}", record, index - 1, evaluatedValue);
+
+                            if (evaluatedValue != null && evaluatedValue 
instanceof String) {
+                                final String partition = (String) 
evaluatedValue;
+
+                                if 
(!recordSetFlowFileBuilders.containsKey(partition)) {
+                                    recordSetFlowFileBuilders.put(partition, 
new RecordBatchingProcessorFlowFileBuilder(incomingFlowFile, session, 
recordSetWriterFactory));
+                                }
+
+                                final int recordCount = 
recordSetFlowFileBuilders.get(partition).addRecord(record);
+                                session.adjustCounter("Record Processed", 
recordCount, false);

Review comment:
       I think this logic is incorrect. If you have 3 records, then for the 
first record you'll adjust the counter by 1. The second time you'll adjust the 
counter by 2, and the third time you'll adjust it by 3. So you'll have called 
adjustCounter for 1 + 2 + 3 = 6 instead of adjustCounter(3).
   Should either use `session.adjustCounter("Records Processed", 1, false);` or 
wait until the end and call `session.adjustCounter("Records Processed", 
writeResult.getRecordCount(), false);` I would recommend the latter, as it 
keeps the logic cleaner - do the processing, and at the end handle updating the 
counters.
   
   Would also make sure that the name is Records with an s.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+                return;
+            }
+
+            success = partition(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : 
RELATIONSHIP_FAILURE);
+    }
+
+    private boolean partition(
+            final ProcessContext context,
+            final ProcessSession session,
+            final FlowFile incomingFlowFile,
+            final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<String, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = 
evaluator.evaluate(record, index++);
+                            getLogger().debug("Evaluated scripted against {} 
(index {}), producing result of {}", record, index - 1, evaluatedValue);
+
+                            if (evaluatedValue != null && evaluatedValue 
instanceof String) {
+                                final String partition = (String) 
evaluatedValue;
+
+                                if 
(!recordSetFlowFileBuilders.containsKey(partition)) {
+                                    recordSetFlowFileBuilders.put(partition, 
new RecordBatchingProcessorFlowFileBuilder(incomingFlowFile, session, 
recordSetWriterFactory));
+                                }
+
+                                final int recordCount = 
recordSetFlowFileBuilders.get(partition).addRecord(record);
+                                session.adjustCounter("Record Processed", 
recordCount, false);
+
+                            } else {
+                                throw new ProcessException("Script returned a 
value of " + evaluatedValue
+                                        + " but this Processor requires that 
the object returned by an instance of String");
+                            }
+                        }
+
+                        // Sending outgoing flow files
+                        int fragmentIndex = 1;
+
+                        for (final Map.Entry<String, 
RecordBatchingProcessorFlowFileBuilder> entry : 
recordSetFlowFileBuilders.entrySet()) {

Review comment:
       Based on how this is used, I would eliminate this 
RecordSetFlowFileBuilder class entirely - you really just need a `Map<String, 
RecordSetWriter>`. And perhaps a parallel `Map<String, FlowFile>` that contains 
the output FlowFiles.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouteRecord.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+@Tags({"record", "routing", "script", "groovy", "jython", "python", "segment", 
"split", "group", "organize"})
+@CapabilityDescription(
+        "This processor provides the ability to route the records of the 
incoming FlowFile using an user-provided script. " +
+        "The script is expected to handle a record as argument and return with 
a string value. " +
+        "The returned value defines a route. All routes are bounded to an 
outgoing relationship where the record will be transferred to. " +
+        "Relationships are defined as dynamic properties: dynamic property 
names are serving as the name of the route. " +
+        "The value of a dynamic property defines the relationship the given 
record will be routed into. Multiple routes might point to the same 
relationship. " +
+        "Creation of these dynamic relationship is managed by the processor. " 
+
+        "The records, which for the script returned with an unknown 
relationship name are routed to the \"unmatched\" relationship. " +
+        "The records are batched: for an incoming FlowFile, all the records 
routed towards a given relationship are batched into one single FlowFile."
+)
+@SeeAlso(classNames = {
+    "org.apache.nifi.processors.script.ScriptedTransformRecord",
+    "org.apache.nifi.processors.script.ScriptedPartitionRecord",
+    "org.apache.nifi.processors.script.ScriptedValidateRecord",
+    "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+@DynamicRelationship(name = "Name from Dynamic Property", description = 
"FlowFiles that match the Dynamic Property's Attribute Expression Language")
+public class ScriptedRouteRecord extends ScriptedRouterProcessor<String> {

Review comment:
       I think the other processors are very valuable. But I question the 
necessity of this Processor. I find the configuration very confusing and not 
user friendly. The user has to understand the possible values that the script 
could return, and then the user has to go in and redefine them as property keys 
and map each of those possible values to a relationship. It feels awkward and 
creates a tight coupling between a 'partition' and a Relationship.
   
   The more conventional approach would be to just use the partition processor. 
Then, if they want to route based on that, use RouteOnAttribute. The benefits 
there are that RouteOnAttribute is far more powerful as it enables the full 
power of Expression Language, it's already well understood, it provides the 
ability to inspect the attribute value that will be used for routing, which 
makes it simpler to use in a dataflow, and it decouples the concepts of routes 
and partitions.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+                return;
+            }
+
+            success = partition(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : 
RELATIONSHIP_FAILURE);
+    }
+
+    private boolean partition(
+            final ProcessContext context,
+            final ProcessSession session,
+            final FlowFile incomingFlowFile,
+            final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<String, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = 
evaluator.evaluate(record, index++);
+                            getLogger().debug("Evaluated scripted against {} 
(index {}), producing result of {}", record, index - 1, evaluatedValue);
+
+                            if (evaluatedValue != null && evaluatedValue 
instanceof String) {
+                                final String partition = (String) 
evaluatedValue;
+
+                                if 
(!recordSetFlowFileBuilders.containsKey(partition)) {
+                                    recordSetFlowFileBuilders.put(partition, 
new RecordBatchingProcessorFlowFileBuilder(incomingFlowFile, session, 
recordSetWriterFactory));
+                                }
+
+                                final int recordCount = 
recordSetFlowFileBuilders.get(partition).addRecord(record);
+                                session.adjustCounter("Record Processed", 
recordCount, false);
+
+                            } else {
+                                throw new ProcessException("Script returned a 
value of " + evaluatedValue
+                                        + " but this Processor requires that 
the object returned by an instance of String");
+                            }
+                        }
+
+                        // Sending outgoing flow files
+                        int fragmentIndex = 1;
+
+                        for (final Map.Entry<String, 
RecordBatchingProcessorFlowFileBuilder> entry : 
recordSetFlowFileBuilders.entrySet()) {
+                            final String partitionName = entry.getKey();
+                            final RecordBatchingProcessorFlowFileBuilder 
builder = entry.getValue();
+
+                            FlowFile outgoingFlowFile = builder.build();
+                            outgoingFlowFile = 
session.putAttribute(outgoingFlowFile, "partition", partitionName);

Review comment:
       Calls to `session.putAttribute` are expensive. They create a full copy 
of the HashMap that holds attributes. This method should not be used unless 
only adding a single attribute. If adding multiple attributes, should instead 
create a Map<String, String>, add all necessary attributes, and then use 
`session.putAllAttributes`

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the 
user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailureRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : 
getFailureRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<Relationship, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = 
evaluator.evaluate(record, index++);
+                            getLogger().debug("Evaluated scripted against {} 
(index {}), producing result of {}", record, index - 1, evaluatedValue);
+
+                            if (evaluatedValue != null && 
scriptResultType.isInstance(evaluatedValue)) {
+                                final Optional<Relationship> 
outgoingRelationship = 
resolveRelationship(scriptResultType.cast(evaluatedValue));
+
+                                if (outgoingRelationship.isPresent()) {
+                                    if 
(!recordSetFlowFileBuilders.containsKey(outgoingRelationship.get())) {
+                                        
recordSetFlowFileBuilders.put(outgoingRelationship.get(), new 
RecordBatchingProcessorFlowFileBuilder(incomingFlowFile, session, 
recordSetWriterFactory));
+                                    }
+
+                                    final int recordCount = 
recordSetFlowFileBuilders.get(outgoingRelationship.get()).addRecord(record);
+                                    session.adjustCounter("Record Processed", 
recordCount, false);
+                                } else {
+                                    getLogger().debug("Record with evaluated 
value {} has no outgoing relationship determined", 
String.valueOf(evaluatedValue));
+                                }
+                            } else {
+                                throw new ProcessException("Script returned a 
value of " + evaluatedValue
+                                        + " but this Processor requires that 
the object returned by an instance of " + scriptResultType.getSimpleName());

Review comment:
       'returned be an instance' rather than 'returned by an instance'

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedFilterRecord.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+@Tags({"record", "filter", "script", "groovy", "jython", "python"})
+@CapabilityDescription(
+    "This processor provides the ability to filter records out from FlowFiles 
using the user-provided script. " +
+    "Every record will be evaluated by the script which must return with a 
boolean value. " +
+    "Records with \"true\" result will be routed to the \"matching\" 
relationship in a batch. " +
+    "Other records will be filtered out."
+)
+@SeeAlso(classNames = {
+    "org.apache.nifi.processors.script.ScriptedTransformRecord",
+    "org.apache.nifi.processors.script.ScriptedValidateRecord",
+    "org.apache.nifi.processors.script.ScriptedRouteRecord",
+    "org.apache.nifi.processors.script.ScriptedPartitionRecord"
+})
+public class ScriptedFilterRecord extends ScriptedRouterProcessor<Boolean> {
+    static final Relationship RELATIONSHIP_MATCHING = new 
Relationship.Builder()
+            .name("matching")

Review comment:
       Should use 'matched' or 'success' instead of 'matching' in order to 
stick to standard naming conventions. Would recommend 'success'.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+                return;
+            }
+
+            success = partition(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : 
RELATIONSHIP_FAILURE);
+    }
+
+    private boolean partition(
+            final ProcessContext context,
+            final ProcessSession session,
+            final FlowFile incomingFlowFile,
+            final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<String, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = 
evaluator.evaluate(record, index++);
+                            getLogger().debug("Evaluated scripted against {} 
(index {}), producing result of {}", record, index - 1, evaluatedValue);
+
+                            if (evaluatedValue != null && evaluatedValue 
instanceof String) {
+                                final String partition = (String) 
evaluatedValue;
+
+                                if 
(!recordSetFlowFileBuilders.containsKey(partition)) {
+                                    recordSetFlowFileBuilders.put(partition, 
new RecordBatchingProcessorFlowFileBuilder(incomingFlowFile, session, 
recordSetWriterFactory));
+                                }
+
+                                final int recordCount = 
recordSetFlowFileBuilders.get(partition).addRecord(record);
+                                session.adjustCounter("Record Processed", 
recordCount, false);
+
+                            } else {
+                                throw new ProcessException("Script returned a 
value of " + evaluatedValue
+                                        + " but this Processor requires that 
the object returned by an instance of String");

Review comment:
       I think that's supposed to be 'be an instance' instead of 'by an 
instance'

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the 
user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailureRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : 
getFailureRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<Relationship, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = 
evaluator.evaluate(record, index++);
+                            getLogger().debug("Evaluated scripted against {} 
(index {}), producing result of {}", record, index - 1, evaluatedValue);
+
+                            if (evaluatedValue != null && 
scriptResultType.isInstance(evaluatedValue)) {
+                                final Optional<Relationship> 
outgoingRelationship = 
resolveRelationship(scriptResultType.cast(evaluatedValue));
+
+                                if (outgoingRelationship.isPresent()) {
+                                    if 
(!recordSetFlowFileBuilders.containsKey(outgoingRelationship.get())) {
+                                        
recordSetFlowFileBuilders.put(outgoingRelationship.get(), new 
RecordBatchingProcessorFlowFileBuilder(incomingFlowFile, session, 
recordSetWriterFactory));
+                                    }
+
+                                    final int recordCount = 
recordSetFlowFileBuilders.get(outgoingRelationship.get()).addRecord(record);
+                                    session.adjustCounter("Record Processed", 
recordCount, false);

Review comment:
       Same comment as the other processor. Should call it 'Records' with an s 
- but the same logic error appears to be present here - the counter is being 
incremented n + (n-1) + (n-2) + ... times instead of n times.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedProcessor.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.script.ScriptingComponentHelper;
+import org.apache.nifi.script.ScriptingComponentUtils;
+import org.apache.nifi.search.SearchContext;
+import org.apache.nifi.search.SearchResult;
+import org.apache.nifi.search.Searchable;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+abstract class ScriptedProcessor extends AbstractProcessor implements 
Searchable {

Review comment:
       Recommend calling this `AbstractScriptedRecordProcessor` or something 
along those lines, in order to convey the intent fo this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to