Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2711#discussion_r190750155 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.text.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.Collections; +import java.util.Arrays; +import java.util.ArrayList; + + + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + + "can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + + "If the attribute value contains a comma, newline or double quote, then the attribute value will be " + + "escaped with double quotes. Any double quote characters in the attribute value are escaped with " + + "another double quote.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; + private static final String OUTPUT_SEPARATOR = ","; + private static final String OUTPUT_MIME_TYPE = "text/csv"; + private static final String SPLIT_REGEX = OUTPUT_SEPARATOR + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"; + + static final AllowableValue OUTPUT_OVERWRITE_CONTENT = new AllowableValue("flowfile-content", "flowfile-content", "The resulting CSV string will be placed into the content of the flowfile." + + "Existing flowfile context will be overwritten. 'CSVAttributes' will not be written to at all (neither null nor empty string)."); + static final AllowableValue OUTPUT_NEW_ATTRIBUTE= new AllowableValue("flowfile-attribute", "flowfile-attribute", "The resulting CSV string will be placed into a new flowfile" + + " attribute named 'CSVAttributes'. The content of the flowfile will not be changed."); + + public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() + .name("attribute-list") + .displayName("Attribute List") + .description("Comma separated list of attributes to be included in the resulting CSV. If this value " + + "is left empty then all existing Attributes will be included. This list of attributes is " + + "case sensitive and supports attribute names that contain commas. If an attribute specified in the list is not found it will be emitted " + + "to the resulting CSV with an empty string or null depending on the 'Null Value' property. " + + "If a core attribute is specified in this list " + + "and the 'Include Core Attributes' property is false, the core attribute will be included. The attribute list " + + "ALWAYS wins.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ATTRIBUTES_REGEX = new PropertyDescriptor.Builder() + .name("attributes-regex") + .displayName("Attributes Regular Expression") + .description("Regular expression that will be evaluated against the flow file attributes to select " + + "the matching attributes. This property can be used in combination with the attributes " + + "list property. The final output will contain a combination of matches found in the ATTRIBUTE_LIST and ATTRIBUTE_REGEX.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("destination") + .displayName("Destination") + .description("Control if CSV value is written as a new flowfile attribute 'CSVAttributes' " + + "or written in the flowfile content.") + .required(true) + .allowableValues(OUTPUT_NEW_ATTRIBUTE, OUTPUT_OVERWRITE_CONTENT) + .defaultValue(OUTPUT_NEW_ATTRIBUTE.getDisplayName()) + .build(); + + public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("include-core-attributes") + .displayName("Include Core Attributes") + .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes, which are " + + "contained in every FlowFile, should be included in the final CSV value generated. The Attribute List property " + + "overrides this setting.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder() + .name("null-value") + .displayName("Null Value") + .description("If true a non existing or empty attribute will be 'null' in the resulting CSV. If false an empty " + + "string will be placed in the CSV") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successfully converted attributes to CSV").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed to convert attributes to CSV").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private volatile Boolean includeCoreAttributes; + private volatile Set<String> coreAttributes; + private volatile boolean destinationContent; + private volatile boolean nullValForEmptyString; + private volatile Pattern pattern; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(ATTRIBUTES_LIST); + properties.add(ATTRIBUTES_REGEX); + properties.add(DESTINATION); + properties.add(INCLUDE_CORE_ATTRIBUTES); + properties.add(NULL_VALUE_FOR_EMPTY_STRING); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + + private Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Pattern attPattern) { + Map<String, String> result; + Map<String, String> ffAttributes = ff.getAttributes(); + result = new LinkedHashMap<>(ffAttributes.size()); + + if (!attributes.isEmpty() || attPattern != null) { + if (!attributes.isEmpty()) { + //the user gave a list of attributes + for (String attribute : attributes) { + String val = ff.getAttribute(attribute); + if (val != null && !val.isEmpty()) { + result.put(attribute, val); + } else { + if (nullValForEmptyString) { + result.put(attribute, "null"); + } else { + result.put(attribute, ""); + } + } + } + } + if(attPattern != null) { + for (Map.Entry<String, String> e : ff.getAttributes().entrySet()) { + if(attPattern.matcher(e.getKey()).matches()) { + result.put(e.getKey(), e.getValue()); + } + } + } + } else { + //the user did not give a list of attributes, take all the attributes from the flowfile + result.putAll(ffAttributes); + } + + //now glue on the core attributes if the user wants them. + if(includeCoreAttributes) { + for (String coreAttribute : coreAttributes) { + //make sure this coreAttribute is applicable to this flowfile. + String val = ff.getAttribute(coreAttribute); + if(ffAttributes.containsKey(coreAttribute)) { + if (!StringUtils.isEmpty(val)){ + result.put(coreAttribute, val); + } else { + if (nullValForEmptyString) { + result.put(coreAttribute, "null"); + } else { + result.put(coreAttribute, ""); + } + } + } + } + } else { + //remove core attributes since the user does not want them, unless they are in the attribute list. Attribute List always wins + for (String coreAttribute : coreAttributes) { + //never override user specified attributes, even if the user has selected to exclude core attributes + if(!attributes.contains(coreAttribute)) { + result.remove(coreAttribute); + } + } + } + return result; + } + + private LinkedHashSet<String> attributeListStringToSet(String attributeList) { + //take the user specified attribute list string and convert to list of strings. + LinkedHashSet<String> result = new LinkedHashSet<>(); + if (StringUtils.isNotBlank(attributeList)) { + String[] ats = attributeList.split(SPLIT_REGEX); + for (String str : ats) { + result.add(StringEscapeUtils.unescapeCsv(str.trim())); + } + } + return result; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + includeCoreAttributes = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(); + coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet()); + destinationContent = OUTPUT_OVERWRITE_CONTENT.getValue().equals(context.getProperty(DESTINATION).getValue()); + nullValForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile original = session.get(); + if (original == null) { + return; + } + if(context.getProperty(ATTRIBUTES_REGEX).isSet()) { + pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions(original).getValue()); --- End diff -- Local usage of variable only. Should not be defined as field/instance member (same as attributeList).
---