Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2833#discussion_r210065027
--- Diff:
nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jolt.record;
+
+import com.bazaarvoice.jolt.ContextualTransform;
+import com.bazaarvoice.jolt.JoltTransform;
+import com.bazaarvoice.jolt.JsonUtils;
+import com.bazaarvoice.jolt.Transform;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.jolt.record.util.TransformFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ListRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr",
"removr", "cardinality", "sort"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+ @WritesAttribute(attribute = "record.count", description = "The
number of records in an outgoing FlowFile"),
+ @WritesAttribute(attribute = "mime.type", description = "The MIME
Type that the configured Record Writer indicates is appropriate"),
+})
+@CapabilityDescription("Applies a list of Jolt specifications to the
FlowFile payload. A new FlowFile is created "
+ + "with transformed content and is routed to the 'success'
relationship. If the transform "
+ + "fails, the original FlowFile is routed to the 'failure'
relationship.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description
= "If the Jolt transform is applied to the entire record set, memory issues can
occur "
+ + "for large record sets.")
+public class JoltTransformRecord extends AbstractProcessor {
+
+ static final AllowableValue SHIFTR
+ = new AllowableValue("jolt-transform-shift", "Shift", "Shift
input data to create the output.");
+ static final AllowableValue CHAINR
+ = new AllowableValue("jolt-transform-chain", "Chain", "Execute
list of Jolt transformations.");
+ static final AllowableValue DEFAULTR
+ = new AllowableValue("jolt-transform-default", "Default", "
Apply default values to the output.");
+ static final AllowableValue REMOVR
+ = new AllowableValue("jolt-transform-remove", "Remove", "
Remove values from input data to create the output.");
+ static final AllowableValue CARDINALITY
+ = new AllowableValue("jolt-transform-card", "Cardinality",
"Change the cardinality of input elements to create the output.");
+ static final AllowableValue SORTR
+ = new AllowableValue("jolt-transform-sort", "Sort", "Sort
input field name values alphabetically. Any specification set is ignored.");
+ static final AllowableValue CUSTOMR
+ = new AllowableValue("jolt-transform-custom", "Custom",
"Custom Transformation. Requires Custom Transformation Class Name");
+ static final AllowableValue MODIFIER_DEFAULTR
+ = new AllowableValue("jolt-transform-modify-default", "Modify
- Default", "Writes when field name is missing or value is null");
+ static final AllowableValue MODIFIER_OVERWRITER
+ = new AllowableValue("jolt-transform-modify-overwrite",
"Modify - Overwrite", " Always overwrite value");
+ static final AllowableValue MODIFIER_DEFINER
+ = new AllowableValue("jolt-transform-modify-define", "Modify -
Define", "Writes when key is missing");
+
+ static final AllowableValue APPLY_TO_RECORD_SET
+ = new AllowableValue("jolt-record-apply-recordset", "Entire
Record Set", "Applies the transformation to the record set as a whole. Used
when "
+ + "values from multiple records are needed in the
transformation.");
+ static final AllowableValue APPLY_TO_RECORDS
+ = new AllowableValue("jolt-record-apply-records", "Each
Record", "Applies the transformation to each record individually.");
+
+
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("jolt-record-record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
parsing incoming data and determining the data's schema.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("jolt-record-record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for
writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor JOLT_TRANSFORM = new
PropertyDescriptor.Builder()
+ .name("jolt-record-transform")
+ .displayName("Jolt Transformation DSL")
+ .description("Specifies the Jolt Transformation that should be
used with the provided specification.")
+ .required(true)
+ .allowableValues(CARDINALITY, CHAINR, DEFAULTR,
MODIFIER_DEFAULTR, MODIFIER_DEFINER, MODIFIER_OVERWRITER, REMOVR, SHIFTR,
SORTR, CUSTOMR)
+ .defaultValue(CHAINR.getValue())
+ .build();
+
+ static final PropertyDescriptor JOLT_SPEC = new
PropertyDescriptor.Builder()
+ .name("jolt-record-spec")
+ .displayName("Jolt Specification")
+ .description("Jolt Specification for transform of record data.
This value is ignored if the Jolt Sort Transformation is selected.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor CUSTOM_CLASS = new
PropertyDescriptor.Builder()
+ .name("jolt-record-custom-class")
+ .displayName("Custom Transformation Class Name")
+ .description("Fully Qualified Class Name for Custom
Transformation")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor MODULES = new
PropertyDescriptor.Builder()
+ .name("jolt-record-custom-modules")
+ .displayName("Custom Module Directory")
+ .description("Comma-separated list of paths to files and/or
directories which contain modules containing custom transformations (that are
not included on NiFi's classpath).")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor TRANSFORM_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("jolt-record-transform-strategy")
+ .displayName("Transformation Strategy")
+ .description("Specifies whether the transform should be
applied to the entire record set or to each individual record. Note that when
the transform is applied to "
+ + "the entire record set, the first element in the
spec should be an asterix (*) in order to match each record.")
+ .required(true)
+ .allowableValues(APPLY_TO_RECORD_SET, APPLY_TO_RECORDS)
+ .defaultValue(APPLY_TO_RECORDS.getValue())
+ .build();
+
+ static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new
PropertyDescriptor.Builder()
+ .name("jolt-record-transform-cache-size")
+ .displayName("Transform Cache Size")
+ .description("Compiling a Jolt Transform can be fairly
expensive. Ideally, this will be done only once. However, if the Expression
Language is used in the transform, we may need "
+ + "a new Transform for each FlowFile. This value
controls how many of those Transforms we cache in memory in order to avoid
having to compile the Transform each time.")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("The FlowFile with transformed content will be
routed to this relationship")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile fails processing for any reason
(for example, the FlowFile records cannot be parsed), it will be routed to this
relationship")
+ .build();
+
+ private final static List<PropertyDescriptor> properties;
+ private final static Set<Relationship> relationships;
+ private volatile ClassLoader customClassLoader;
+ private final static String DEFAULT_CHARSET = "UTF-8";
+
+ // Cache is guarded by synchronizing on 'this'.
+ private volatile int maxTransformsToCache = 10;
+ private final Map<String, JoltTransform> transformCache = new
LinkedHashMap<String, JoltTransform>() {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String,
JoltTransform> eldest) {
+ final boolean evict = size() > maxTransformsToCache;
+ if (evict) {
+ getLogger().debug("Removing Jolt Transform from cache
because cache is full");
+ }
+ return evict;
+ }
+ };
+
+ static {
+ final List<PropertyDescriptor> _properties = new ArrayList<>();
+ _properties.add(RECORD_READER);
+ _properties.add(RECORD_WRITER);
+ _properties.add(JOLT_TRANSFORM);
+ _properties.add(CUSTOM_CLASS);
+ _properties.add(MODULES);
+ _properties.add(JOLT_SPEC);
+ _properties.add(TRANSFORM_STRATEGY);
+ _properties.add(TRANSFORM_CACHE_SIZE);
+ properties = Collections.unmodifiableList(_properties);
+
+ final Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+ final String transform =
validationContext.getProperty(JOLT_TRANSFORM).getValue();
+ final String customTransform =
validationContext.getProperty(CUSTOM_CLASS).getValue();
+ final String modulePath =
validationContext.getProperty(MODULES).isSet() ?
validationContext.getProperty(MODULES).getValue() : null;
+
+ if (!validationContext.getProperty(JOLT_SPEC).isSet() ||
StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
+ if (!SORTR.getValue().equals(transform)) {
+ final String message = "A specification is required for
this transformation";
+ results.add(new ValidationResult.Builder().valid(false)
+ .explanation(message)
+ .build());
+ }
+ } else {
+ final ClassLoader customClassLoader;
+
+ try {
+ if (modulePath != null) {
--- End diff --
This is copy/paste from the original Jolt processor :) I'll replace it as
you said and make sure all is well.
---