simonbence commented on a change in pull request #4948:
URL: https://github.com/apache/nifi/pull/4948#discussion_r664276040



##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/RecordBatchingProcessorFlowFileBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class contains all the information necessary to prepare an outgoing 
flow file.
+ */
+final class RecordBatchingProcessorFlowFileBuilder {
+    private final ProcessSession session;
+    private final FlowFile incomingFlowFile;
+    final private FlowFile outgoingFlowFile;
+    private final OutputStream out;
+    private final RecordSetWriter writer;
+    private final List<Map<String, String>> attributes = new LinkedList<>();
+
+    private int recordCount = 0;
+
+    RecordBatchingProcessorFlowFileBuilder(
+            final FlowFile incomingFlowFile,
+            final ProcessSession session,
+            final BiFunction<FlowFile, OutputStream, RecordSetWriter> 
recordSetWriterSupplier
+    ) throws IOException {
+        this.session = session;
+        this.incomingFlowFile = incomingFlowFile;
+        this.outgoingFlowFile = session.create(incomingFlowFile);
+        this.out = session.write(outgoingFlowFile);
+        this.writer = recordSetWriterSupplier.apply(outgoingFlowFile, out);
+        this.writer.beginRecordSet();
+    }
+
+    int addRecord(final Record record) throws IOException {
+        final WriteResult writeResult = writer.write(record);
+        attributes.add(writeResult.getAttributes());
+        recordCount += writeResult.getRecordCount();
+        return recordCount;
+    }
+
+    private Map<String, String> getWriteAttributes() {
+        final Map<String, String> result = new HashMap<>();
+        final Set<String> attributeNames = attributes.stream().map(a -> 
a.keySet()).flatMap(x -> x.stream()).collect(Collectors.toSet());

Review comment:
       Yes, good idea. (But for to make it easier to follow, I will use the 
`Set::stream` instead of `Collection::stream`

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+                return;
+            }
+
+            success = partition(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : 
RELATIONSHIP_FAILURE);
+    }
+
+    private boolean partition(
+            final ProcessContext context,
+            final ProcessSession session,
+            final FlowFile incomingFlowFile,
+            final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<String, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = 
evaluator.evaluate(record, index++);
+                            getLogger().debug("Evaluated scripted against {} 
(index {}), producing result of {}", record, index - 1, evaluatedValue);
+
+                            if (evaluatedValue != null && evaluatedValue 
instanceof String) {

Review comment:
       Checking the null explicitly communicates my intention more clearly

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+                return;
+            }
+
+            success = partition(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : 
RELATIONSHIP_FAILURE);
+    }
+
+    private boolean partition(
+            final ProcessContext context,
+            final ProcessSession session,
+            final FlowFile incomingFlowFile,
+            final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<String, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };

Review comment:
       I was thinking on this originally but I decided not to do. My key 
reasons: 1. this behaviour is specific to these two classes and not part of the 
more generic abstraction `ScriptedProcessor`, thus the abstraction would break. 
2. this is not as big amount of code to introduce an intermediate level of 
abstraction (Note: originally these two classes were not separated)

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+                return;
+            }
+
+            success = partition(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : 
RELATIONSHIP_FAILURE);
+    }
+
+    private boolean partition(
+            final ProcessContext context,
+            final ProcessSession session,
+            final FlowFile incomingFlowFile,
+            final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {

Review comment:
       As the implementation is quite big, this helps reading and keeping 
boundaries

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;

Review comment:
       Could you please elaborate? If you are pointing towards the explicit 
assignment, that is intentional for easier understanding

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+                return;
+            }
+
+            success = partition(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : 
RELATIONSHIP_FAILURE);
+    }
+
+    private boolean partition(

Review comment:
       As the bulk of the method is an anonymous implementation. Splitting the 
anonymous implementation into parts,  as you suggest might work, but from my 
perspective, it would have more drawback than merit. If I extend it into the 
anonymous class as private methods, we do not win much, and if I move the util 
methods to the enclosing class as static ones, the logic will be pretty 
fragmented. A lot happens here, which would be good to not complicate further 
with splitting (in this case, not in general). Also please note that, the 
implementations I added are slightly based on the already existing 
`ScriptedTransformRecord`, which I tried to follow in form and function as far 
as it is possible

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedProcessor.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.script.ScriptingComponentHelper;
+import org.apache.nifi.script.ScriptingComponentUtils;
+import org.apache.nifi.search.SearchContext;
+import org.apache.nifi.search.SearchResult;
+import org.apache.nifi.search.Searchable;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+abstract class ScriptedProcessor extends AbstractProcessor implements 
Searchable {
+    protected static final String PYTHON_SCRIPT_LANGUAGE = "python";
+    protected static final Set<String> SCRIPT_OPTIONS = 
ScriptingComponentUtils.getAvailableEngines();
+
+    protected volatile String scriptToRun = null;
+    protected final AtomicReference<CompiledScript> compiledScriptRef = new 
AtomicReference<>();
+    private final ScriptingComponentHelper scriptingComponentHelper = new 
ScriptingComponentHelper();
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use parsing the incoming 
FlowFile into Records")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use for serializing Records 
after they have been transformed")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder()
+            .name("Script Engine")
+            .displayName("Script Language")
+            .description("The Language to use for the script")
+            .allowableValues(SCRIPT_OPTIONS)
+            .defaultValue("Groovy")
+            .required(true)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = 
Arrays.asList(
+            RECORD_READER,
+            RECORD_WRITER,
+            LANGUAGE,
+            ScriptingComponentUtils.SCRIPT_BODY,
+            ScriptingComponentUtils.SCRIPT_FILE,
+            ScriptingComponentUtils.MODULES);
+
+
+    @OnScheduled
+    public void setup(final ProcessContext context) throws IOException {
+        if (!scriptingComponentHelper.isInitialized.get()) {
+            scriptingComponentHelper.createResources(false);
+        }
+
+        scriptingComponentHelper.setupVariables(context);
+        scriptToRun = scriptingComponentHelper.getScriptBody();
+
+        if (scriptToRun == null && scriptingComponentHelper.getScriptPath() != 
null) {
+            try (final FileInputStream scriptStream = new 
FileInputStream(scriptingComponentHelper.getScriptPath())) {
+                scriptToRun = IOUtils.toString(scriptStream, 
Charset.defaultCharset());
+            }
+        }
+
+        // Create a script runner for each possible task
+        final int maxTasks = context.getMaxConcurrentTasks();
+        scriptingComponentHelper.setupScriptRunners(maxTasks, scriptToRun, 
getLogger());
+
+        // Always compile when first run
+        compiledScriptRef.set(null);
+    }
+
+    protected ScriptEvaluator createEvaluator(final ScriptEngine scriptEngine, 
final FlowFile flowFile) throws ScriptException {
+        if 
(PYTHON_SCRIPT_LANGUAGE.equalsIgnoreCase(scriptEngine.getFactory().getLanguageName()))
 {
+            final CompiledScript compiledScript = 
getOrCompileScript((Compilable) scriptEngine, scriptToRun);
+            return new PythonScriptEvaluator(scriptEngine, compiledScript, 
flowFile, getLogger());
+        }
+
+        return new InterpretedScriptEvaluator(scriptEngine, scriptToRun, 
flowFile, getLogger());
+    }
+
+    private CompiledScript getOrCompileScript(final Compilable scriptEngine, 
final String scriptToRun) throws ScriptException {
+        final CompiledScript existing = compiledScriptRef.get();
+        if (existing != null) {
+            return existing;
+        }
+
+        final CompiledScript compiled = scriptEngine.compile(scriptToRun);
+        final boolean updated = compiledScriptRef.compareAndSet(null, 
compiled);
+        if (updated) {
+            return compiled;
+        }
+
+        return compiledScriptRef.get();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        return scriptingComponentHelper.customValidate(validationContext);
+    }
+
+    @Override
+    public Collection<SearchResult> search(final SearchContext context) {
+        return ScriptingComponentUtils.search(context, getLogger());
+    }
+
+    protected static Bindings setupBindings(final ScriptEngine scriptEngine) {
+        Bindings bindings = 
scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
+        if (bindings == null) {
+            bindings = new SimpleBindings();
+        }
+
+        scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
+
+        return bindings;
+    }
+
+    protected ScriptRunner pollScriptRunner() {
+        return scriptingComponentHelper.scriptRunnerQ.poll();
+    }
+
+    protected void offerScriptRunner(ScriptRunner scriptRunner) {
+        scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);

Review comment:
       In theory, this is a possibility. If the `BlockingQueue` would be full, 
we would got a `null`, even worse, we would loose the instance, as we would not 
held reference on that anymore. Luckily, the script runner logic is fabricated 
on a manner that it works with a predefined set of runners, which are `pull`-ed 
and after usage, `offer`-ed back to the queue. The situation you are mentioning 
would assume that somewhere a new instance is being created which is not the 
case.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouteRecord.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+@Tags({"record", "routing", "script", "groovy", "jython", "python", "segment", 
"split", "group", "organize"})
+@CapabilityDescription(
+        "This processor provides the ability to route the records of the 
incoming FlowFile using an user-provided script. " +
+        "The script is expected to handle a record as argument and return with 
a string value. " +
+        "The returned value defines a route. All routes are bounded to an 
outgoing relationship where the record will be transferred to. " +
+        "Relationships are defined as dynamic properties: dynamic property 
names are serving as the name of the route. " +
+        "The value of a dynamic property defines the relationship the given 
record will be routed into. Multiple routes might point to the same 
relationship. " +
+        "Creation of these dynamic relationship is managed by the processor. " 
+
+        "The records, which for the script returned with an unknown 
relationship name are routed to the \"unmatched\" relationship. " +
+        "The records are batched: for an incoming FlowFile, all the records 
routed towards a given relationship are batched into one single FlowFile."
+)
+@SeeAlso(classNames = {
+    "org.apache.nifi.processors.script.ScriptedTransformRecord",
+    "org.apache.nifi.processors.script.ScriptedPartitionRecord",
+    "org.apache.nifi.processors.script.ScriptedValidateRecord",
+    "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+@DynamicRelationship(name = "Name from Dynamic Property", description = 
"FlowFiles that match the Dynamic Property's Attribute Expression Language")
+public class ScriptedRouteRecord extends ScriptedRouterProcessor<String> {
+
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description(
+                "After successful procession, the incoming FlowFile will be 
transferred to this relationship. " +
+                "This happens regardless the records are matching to a 
relationship or not.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failed")
+            .description("In case of any issue during processing the incoming 
FlowFile, the incoming FlowFile will be routed to this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNMATCHED = new 
Relationship.Builder()
+            .name("unmatched")
+            .description("Records where the script evaluation returns with an 
unknown partition are routed to this relationship.")
+            .build();
+
+    private static Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+        RELATIONSHIPS.add(RELATIONSHIP_UNMATCHED);
+    }
+
+    private final AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>();

Review comment:
       This processor provides the possibility to dynamically add routes, 
contrary to most of the processors. As processor instances might be called from 
multiple threads, we need to ensure thread safety. For your second question: I 
do not see the value of the renaming.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java
##########
@@ -17,7 +17,6 @@
 

Review comment:
       - As for points 1 and 2, the same applies as above.
   - For point 3> I am not sure which ones you are referring, but the ones in 
lines 254, 262 and 270: the content of the array is different in every case 
thus, we cannot spare them
   - `RuntimeException`: in general I agree with you and it could be a good 
later improvement point but as this ticket ads a bulk of new functionality I 
would find it unlucky to functionally change already existing behaviour. That 
easily ends up with regression issues.

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the 
user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailureRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : 
getFailureRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<Relationship, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = 
evaluator.evaluate(record, index++);
+                            getLogger().debug("Evaluated scripted against {} 
(index {}), producing result of {}", record, index - 1, evaluatedValue);
+
+                            if (evaluatedValue != null && 
scriptResultType.isInstance(evaluatedValue)) {

Review comment:
       That is correct, and just like above I prefer to show the intent 
explicitly

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the 
user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailureRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : 
getFailureRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {

Review comment:
       Same as above

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the 
user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                final ScriptEngine scriptEngine = 
scriptRunner.getScriptEngine();
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailureRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptRunner(scriptRunner);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : 
getFailureRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = 
incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = 
readerFactory.createRecordReader(originalAttributes, in, 
incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = 
writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+                        final Map<Relationship, 
RecordBatchingProcessorFlowFileBuilder> recordSetFlowFileBuilders = new 
HashMap<>();
+                        final BiFunction<FlowFile, OutputStream, 
RecordSetWriter> recordSetWriterFactory = (outgoingFlowFile, out) -> {
+                            try {
+                                return writerFactory.createWriter(getLogger(), 
schema, out, outgoingFlowFile);
+                            } catch (final IOException | 
SchemaNotFoundException e) {
+                                throw new ProcessException("Could not create 
RecordSetWriter", e);
+                            }
+                        };
+
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {

Review comment:
       As I argued above, splitting the methods of the anonymous 
implementations would make this part harder to read and because of this I 
prefer to leave it as it is. (However, I do understand the cyclomatic 
complexity is high)

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the 
user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;

Review comment:
       Please see above

##########
File path: 
nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+@EventDriven
+@SideEffectFree
+@Tags({"record", "partition", "script", "groovy", "jython", "python", 
"segment", "split", "group", "organize"})
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates the user provided script 
against "
+        + "each record in the incoming flow file. Each record is then grouped 
with other records sharing the same partition and a FlowFile is created for 
each groups of records. " +
+        "Two records shares the same partition if the evaluation of the script 
results the same return value for both. Those will be considered as part of the 
same partition.")
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute 
arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "partition", description = "The partition 
of the outgoing flow file."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records within the flow file."),
+        @WritesAttribute(attribute = "record.error.message", description = 
"This attribute provides on failure the error message encountered by the Reader 
or Writer."),
+        @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count", description = "The 
number of partitioned FlowFiles generated from the parent FlowFile")
+})
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedRouteRecord",
+        "org.apache.nifi.processors.script.ScriptedValidateRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord"
+})
+public class ScriptedPartitionRecord extends ScriptedProcessor {
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+            .build();
+    static final Relationship RELATIONSHIP_ORIGINAL = new 
Relationship.Builder()
+            .name("original")
+            .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+            .build();
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be partitioned from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship")
+            .build();
+
+    private static Set<Relationship> RELATIONSHIPS = new HashSet<>();
+
+    static {
+        RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
+        RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
+        RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptRunner scriptRunner = pollScriptRunner();
+        if (scriptRunner == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;

Review comment:
       I consider that a different case. Here I see value in verbosity in this 
level.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to