Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2833#discussion_r210064919
--- Diff:
nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jolt.record;
+
+import com.bazaarvoice.jolt.ContextualTransform;
+import com.bazaarvoice.jolt.JoltTransform;
+import com.bazaarvoice.jolt.JsonUtils;
+import com.bazaarvoice.jolt.Transform;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.jolt.record.util.TransformFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ListRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr",
"removr", "cardinality", "sort"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+ @WritesAttribute(attribute = "record.count", description = "The
number of records in an outgoing FlowFile"),
+ @WritesAttribute(attribute = "mime.type", description = "The MIME
Type that the configured Record Writer indicates is appropriate"),
+})
+@CapabilityDescription("Applies a list of Jolt specifications to the
FlowFile payload. A new FlowFile is created "
+ + "with transformed content and is routed to the 'success'
relationship. If the transform "
+ + "fails, the original FlowFile is routed to the 'failure'
relationship.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description
= "If the Jolt transform is applied to the entire record set, memory issues can
occur "
+ + "for large record sets.")
+public class JoltTransformRecord extends AbstractProcessor {
+
+ static final AllowableValue SHIFTR
+ = new AllowableValue("jolt-transform-shift", "Shift", "Shift
input data to create the output.");
+ static final AllowableValue CHAINR
+ = new AllowableValue("jolt-transform-chain", "Chain", "Execute
list of Jolt transformations.");
+ static final AllowableValue DEFAULTR
+ = new AllowableValue("jolt-transform-default", "Default", "
Apply default values to the output.");
+ static final AllowableValue REMOVR
+ = new AllowableValue("jolt-transform-remove", "Remove", "
Remove values from input data to create the output.");
+ static final AllowableValue CARDINALITY
+ = new AllowableValue("jolt-transform-card", "Cardinality",
"Change the cardinality of input elements to create the output.");
+ static final AllowableValue SORTR
+ = new AllowableValue("jolt-transform-sort", "Sort", "Sort
input field name values alphabetically. Any specification set is ignored.");
+ static final AllowableValue CUSTOMR
+ = new AllowableValue("jolt-transform-custom", "Custom",
"Custom Transformation. Requires Custom Transformation Class Name");
+ static final AllowableValue MODIFIER_DEFAULTR
+ = new AllowableValue("jolt-transform-modify-default", "Modify
- Default", "Writes when field name is missing or value is null");
+ static final AllowableValue MODIFIER_OVERWRITER
+ = new AllowableValue("jolt-transform-modify-overwrite",
"Modify - Overwrite", " Always overwrite value");
+ static final AllowableValue MODIFIER_DEFINER
+ = new AllowableValue("jolt-transform-modify-define", "Modify -
Define", "Writes when key is missing");
+
+ static final AllowableValue APPLY_TO_RECORD_SET
+ = new AllowableValue("jolt-record-apply-recordset", "Entire
Record Set", "Applies the transformation to the record set as a whole. Used
when "
+ + "values from multiple records are needed in the
transformation.");
+ static final AllowableValue APPLY_TO_RECORDS
+ = new AllowableValue("jolt-record-apply-records", "Each
Record", "Applies the transformation to each record individually.");
+
+
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("jolt-record-record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
parsing incoming data and determining the data's schema.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("jolt-record-record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for
writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor JOLT_TRANSFORM = new
PropertyDescriptor.Builder()
+ .name("jolt-record-transform")
+ .displayName("Jolt Transformation DSL")
+ .description("Specifies the Jolt Transformation that should be
used with the provided specification.")
+ .required(true)
+ .allowableValues(CARDINALITY, CHAINR, DEFAULTR,
MODIFIER_DEFAULTR, MODIFIER_DEFINER, MODIFIER_OVERWRITER, REMOVR, SHIFTR,
SORTR, CUSTOMR)
+ .defaultValue(CHAINR.getValue())
+ .build();
+
+ static final PropertyDescriptor JOLT_SPEC = new
PropertyDescriptor.Builder()
+ .name("jolt-record-spec")
+ .displayName("Jolt Specification")
+ .description("Jolt Specification for transform of record data.
This value is ignored if the Jolt Sort Transformation is selected.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor CUSTOM_CLASS = new
PropertyDescriptor.Builder()
+ .name("jolt-record-custom-class")
+ .displayName("Custom Transformation Class Name")
+ .description("Fully Qualified Class Name for Custom
Transformation")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor MODULES = new
PropertyDescriptor.Builder()
+ .name("jolt-record-custom-modules")
+ .displayName("Custom Module Directory")
+ .description("Comma-separated list of paths to files and/or
directories which contain modules containing custom transformations (that are
not included on NiFi's classpath).")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor TRANSFORM_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("jolt-record-transform-strategy")
+ .displayName("Transformation Strategy")
+ .description("Specifies whether the transform should be
applied to the entire record set or to each individual record. Note that when
the transform is applied to "
+ + "the entire record set, the first element in the
spec should be an asterix (*) in order to match each record.")
+ .required(true)
+ .allowableValues(APPLY_TO_RECORD_SET, APPLY_TO_RECORDS)
+ .defaultValue(APPLY_TO_RECORDS.getValue())
+ .build();
+
+ static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new
PropertyDescriptor.Builder()
+ .name("jolt-record-transform-cache-size")
+ .displayName("Transform Cache Size")
+ .description("Compiling a Jolt Transform can be fairly
expensive. Ideally, this will be done only once. However, if the Expression
Language is used in the transform, we may need "
+ + "a new Transform for each FlowFile. This value
controls how many of those Transforms we cache in memory in order to avoid
having to compile the Transform each time.")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("The FlowFile with transformed content will be
routed to this relationship")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile fails processing for any reason
(for example, the FlowFile records cannot be parsed), it will be routed to this
relationship")
+ .build();
+
+ private final static List<PropertyDescriptor> properties;
+ private final static Set<Relationship> relationships;
+ private volatile ClassLoader customClassLoader;
+ private final static String DEFAULT_CHARSET = "UTF-8";
+
+ // Cache is guarded by synchronizing on 'this'.
+ private volatile int maxTransformsToCache = 10;
+ private final Map<String, JoltTransform> transformCache = new
LinkedHashMap<String, JoltTransform>() {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String,
JoltTransform> eldest) {
+ final boolean evict = size() > maxTransformsToCache;
+ if (evict) {
+ getLogger().debug("Removing Jolt Transform from cache
because cache is full");
+ }
+ return evict;
+ }
+ };
+
+ static {
+ final List<PropertyDescriptor> _properties = new ArrayList<>();
+ _properties.add(RECORD_READER);
+ _properties.add(RECORD_WRITER);
+ _properties.add(JOLT_TRANSFORM);
+ _properties.add(CUSTOM_CLASS);
+ _properties.add(MODULES);
+ _properties.add(JOLT_SPEC);
+ _properties.add(TRANSFORM_STRATEGY);
+ _properties.add(TRANSFORM_CACHE_SIZE);
+ properties = Collections.unmodifiableList(_properties);
+
+ final Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+ final String transform =
validationContext.getProperty(JOLT_TRANSFORM).getValue();
+ final String customTransform =
validationContext.getProperty(CUSTOM_CLASS).getValue();
+ final String modulePath =
validationContext.getProperty(MODULES).isSet() ?
validationContext.getProperty(MODULES).getValue() : null;
+
+ if (!validationContext.getProperty(JOLT_SPEC).isSet() ||
StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
+ if (!SORTR.getValue().equals(transform)) {
+ final String message = "A specification is required for
this transformation";
+ results.add(new ValidationResult.Builder().valid(false)
+ .explanation(message)
+ .build());
+ }
+ } else {
+ final ClassLoader customClassLoader;
+
+ try {
+ if (modulePath != null) {
+ customClassLoader =
ClassLoaderUtils.getCustomClassLoader(modulePath,
this.getClass().getClassLoader(), getJarFilenameFilter());
+ } else {
+ customClassLoader = this.getClass().getClassLoader();
+ }
+
+ final String specValue =
validationContext.getProperty(JOLT_SPEC).getValue();
+
+ if
(validationContext.isExpressionLanguagePresent(specValue)) {
+ final String invalidExpressionMsg =
validationContext.newExpressionLanguageCompiler().validateExpression(specValue,
true);
+ if (!StringUtils.isEmpty(invalidExpressionMsg)) {
+ results.add(new
ValidationResult.Builder().valid(false)
+ .subject(JOLT_SPEC.getDisplayName())
+ .explanation("Invalid Expression Language:
" + invalidExpressionMsg)
+ .build());
+ }
+ } else {
+ //for validation we want to be able to ensure the spec
is syntactically correct and not try to resolve variables since they may not
exist yet
+ Object specJson = SORTR.getValue().equals(transform) ?
null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"),
DEFAULT_CHARSET);
+
+ if (CUSTOMR.getValue().equals(transform)) {
+ if (StringUtils.isEmpty(customTransform)) {
+ final String customMessage = "A custom
transformation class should be provided. ";
+ results.add(new
ValidationResult.Builder().valid(false)
+ .explanation(customMessage)
+ .build());
+ } else {
+
TransformFactory.getCustomTransform(customClassLoader, customTransform,
specJson);
+ }
+ } else {
+ TransformFactory.getTransform(customClassLoader,
transform, specJson);
+ }
+ }
+ } catch (final Exception e) {
+ getLogger().info("Processor is not valid - " +
e.toString());
+ String message = "Specification not valid for the selected
transformation.";
+ results.add(new ValidationResult.Builder().valid(false)
+ .explanation(message)
+ .build());
+ }
+ }
+
+ return results;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onTrigger(final ProcessContext context, ProcessSession
session) throws ProcessException {
+ final FlowFile original = session.get();
+ if (original == null) {
+ return;
+ }
+
+ final ComponentLog logger = getLogger();
+ final StopWatch stopWatch = new StopWatch(true);
+
+ final String transformStrategy =
context.getProperty(TRANSFORM_STRATEGY).getValue();
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ final RecordSchema schema;
+ final ClassLoader originalContextClassLoader =
Thread.currentThread().getContextClassLoader();
+ try (final InputStream in = session.read(original);
+ final RecordReader reader =
readerFactory.createRecordReader(original, in, getLogger())) {
+ schema = writerFactory.getSchema(original.getAttributes(),
reader.getSchema());
+ Record record;
+
+ FlowFile transformed = session.create(original);
+ final Map<String, String> attributes = new HashMap<>();
+ final WriteResult writeResult;
+ try (final OutputStream out = session.write(transformed);
+ final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, out)) {
+
+ final JoltTransform transform = getTransform(context,
original);
+ if (customClassLoader != null) {
+
Thread.currentThread().setContextClassLoader(customClassLoader);
+ }
+
+ if
(APPLY_TO_RECORD_SET.getValue().equals(transformStrategy)) {
+ List<Map<String, Object>> recordList = new
ArrayList<>();
+ final RecordSet transformedRecordSet;
+ while ((record = reader.nextRecord()) != null) {
+
+ Map<String, Object> recordMap = (Map<String,
Object>) DataTypeUtils.convertRecordFieldtoObject(record,
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ // JOLT expects arrays to be of type List where
our Record code uses Object[].
+ // Make another pass of the transformed objects to
change Object[] to List.
+ recordMap = (Map<String, Object>)
normalizeJoltObjects(recordMap);
+ recordList.add(recordMap);
--- End diff --
That makes sense, I'll take another look and make sure the doc reflects the
idea of a Parent Record containing an array, if you want to be able to do the
transform over the whole array. As you said, that will make the need for two
modes unnecessary.
---