[ https://issues.apache.org/jira/browse/NIFI-1705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941068#comment-15941068 ]
ASF GitHub Bot commented on NIFI-1705: -------------------------------------- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1589#discussion_r107990719 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + + +import java.io.BufferedOutputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + + "can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + + "If the attribute value contains a comma, newline or double quote, then the attribute value will be " + + "escaped with double quotes. Any double quote characters in the attribute value are escaped with " + + "another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + + "attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + + private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; + private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; + private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; + private static final String OUTPUT_SEPARATOR = ","; + private static final String OUTPUT_MIME_TYPE = "text/csv"; + + + public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() + .name("attribute-list") + .displayName("Attribute List") + .description("Comma separated list of attributes to be included in the resulting CSV. If this value " + + "is left empty then all existing Attributes will be included. This list of attributes is " + + "case sensitive. If an attribute specified in the list is not found it will be emitted " + + "to the resulting CSV with an empty string or null depending on the 'Null Value' property. " + + "If a core attribute is specified in this list " + + "and the 'Include Core Attributes' property is false, the core attribute will be included. The attribute list " + + "ALWAYS wins.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("destination") + .displayName("Destination") + .description("Control if CSV value is written as a new flowfile attribute 'CSVAttributes' " + + "or written in the flowfile content. Writing to flowfile content will overwrite any " + + "existing flowfile content.") + .required(true) + .allowableValues(OUTPUT_NEW_ATTRIBUTE, OUTPUT_OVERWRITE_CONTENT) + .defaultValue(OUTPUT_NEW_ATTRIBUTE) + .build(); + + public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("include-core-userSpecifiedAttributes") + .displayName("Include Core Attributes") + .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes, which are " + + "contained in every FlowFile, should be included in the final CSV value generated. The Attribute List property " + + "overrides this setting.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder() + .name(("null-value")) + .displayName("Null Value") + .description("If true a non existing or empty attribute will be 'null' in the resulting CSV. If false an empty " + + "string will be placed in the CSV") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successfully converted attributes to CSV").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed to convert attributes to CSV").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private volatile Set<String> userSpecifiedAttributes; + private volatile Boolean includeCoreAttributes; + private volatile Set<String> coreAttributes; + private volatile boolean destinationContent; + private volatile boolean nullValForEmptyString; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(ATTRIBUTES_LIST); + properties.add(DESTINATION); + properties.add(INCLUDE_CORE_ATTRIBUTES); + properties.add(NULL_VALUE_FOR_EMPTY_STRING); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + + private Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> userSpecifiedAttributes) { + Map<String, String> result; + if (!userSpecifiedAttributes.isEmpty()) { + //the user gave a list of attributes + result = new HashMap<>(userSpecifiedAttributes.size()); + for (String attribute : userSpecifiedAttributes) { + String val = ff.getAttribute(attribute); + if (val != null && !val.isEmpty()) { + result.put(attribute, val); + } else { + if (nullValForEmptyString) { + result.put(attribute, "null"); + } else { + result.put(attribute, ""); + } + } + } + } else { + //the user did not give a list of attributes, take all the attributes from the flowfile + Map<String, String> ffAttributes = ff.getAttributes(); + result = new HashMap<>(ffAttributes.size()); + for (Map.Entry<String, String> e : ffAttributes.entrySet()) { + result.put(e.getKey(), e.getValue()); + } + } + + //now glue on the core attributes if the user wants them. + if(includeCoreAttributes) { + for (String coreAttribute : coreAttributes) { + String val = ff.getAttribute(coreAttribute); + //make sure this coreAttribute is applicable to this flowfile. + if(ff.getAttributes().containsKey(coreAttribute)) { + if (val != null && !val.isEmpty()) { + result.put(coreAttribute, val); + } else { + if (nullValForEmptyString) { + result.put(coreAttribute, "null"); + } else { + result.put(coreAttribute, ""); + } + } + } + } + } else { + //remove core attributes since the user does not want them, unless they are in the attribute list. Attribute List always wins + for (String coreAttribute : coreAttributes) { + //never override user specified attributes, even if the user has selected to exclude core attributes + if(!userSpecifiedAttributes.contains(coreAttribute)) { + result.remove(coreAttribute); + } + } + } + return result; + } + + private Set<String> attributeListStringToSet(String attributeList) { + //take the user specified attribute list string and convert to list of strings. + Set<String> result = new HashSet<>(); + if (StringUtils.isNotBlank(attributeList)) { + String[] ats = StringUtils.split(attributeList, OUTPUT_SEPARATOR); --- End diff -- Another edge case (sorry), attribute names are allowed to have commas in them :( I'm totally fine with not supporting that kind of thing, but maybe add to the description of Attribute List that attribute names containing commas are not allowed. Or if you want to support them, then Attribute List would itself be in CSV format and would have to be parsed as such. I'm good either way, just pointing it out for completeness. > AttributesToCSV > --------------- > > Key: NIFI-1705 > URL: https://issues.apache.org/jira/browse/NIFI-1705 > Project: Apache NiFi > Issue Type: Sub-task > Components: Extensions > Reporter: Randy Gelhausen > > Create a new processor which converts a Flowfile's attributes into CSV > content. > Should support the same configuration options as the AttributesToJSON > processor -- This message was sent by Atlassian JIRA (v6.3.15#6346)