[ 
https://issues.apache.org/jira/browse/NIFI-1705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941068#comment-15941068
 ] 

ASF GitHub Bot commented on NIFI-1705:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1589#discussion_r107990719
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java
 ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.commons.lang3.StringEscapeUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +
    +import java.io.BufferedOutputStream;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.ArrayList;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Collections;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"csv", "attributes", "flowfile"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Generates a CSV representation of the input 
FlowFile Attributes. The resulting CSV " +
    +        "can be written to either a newly generated attribute named 
'CSVAttributes' or written to the FlowFile as content.  " +
    +        "If the attribute value contains a comma, newline or double quote, 
then the attribute value will be " +
    +        "escaped with double quotes.  Any double quote characters in the 
attribute value are escaped with " +
    +        "another double quote.  If the attribute value does not contain a 
comma, newline or double quote, then the " +
    +        "attribute value is returned unchanged.")
    +@WritesAttribute(attribute = "CSVAttributes", description = "CSV 
representation of Attributes")
    +public class AttributesToCSV extends AbstractProcessor {
    +
    +    private static final String OUTPUT_NEW_ATTRIBUTE = 
"flowfile-attribute";
    +    private static final String OUTPUT_OVERWRITE_CONTENT = 
"flowfile-content";
    +    private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes";
    +    private static final String OUTPUT_SEPARATOR = ",";
    +    private static final String OUTPUT_MIME_TYPE = "text/csv";
    +
    +
    +    public static final PropertyDescriptor ATTRIBUTES_LIST = new 
PropertyDescriptor.Builder()
    +            .name("attribute-list")
    +            .displayName("Attribute List")
    +            .description("Comma separated list of attributes to be 
included in the resulting CSV. If this value " +
    +                    "is left empty then all existing Attributes will be 
included. This list of attributes is " +
    +                    "case sensitive. If an attribute specified in the list 
is not found it will be emitted " +
    +                    "to the resulting CSV with an empty string or null 
depending on the 'Null Value' property. " +
    +                    "If a core attribute is specified in this list " +
    +                    "and the 'Include Core Attributes' property is false, 
the core attribute will be included. The attribute list " +
    +                    "ALWAYS wins.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
    +            .name("destination")
    +            .displayName("Destination")
    +            .description("Control if CSV value is written as a new 
flowfile attribute 'CSVAttributes' " +
    +                    "or written in the flowfile content. Writing to 
flowfile content will overwrite any " +
    +                    "existing flowfile content.")
    +            .required(true)
    +            .allowableValues(OUTPUT_NEW_ATTRIBUTE, 
OUTPUT_OVERWRITE_CONTENT)
    +            .defaultValue(OUTPUT_NEW_ATTRIBUTE)
    +            .build();
    +
    +    public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new 
PropertyDescriptor.Builder()
    +            .name("include-core-userSpecifiedAttributes")
    +            .displayName("Include Core Attributes")
    +            .description("Determines if the FlowFile 
org.apache.nifi.flowfile.attributes.CoreAttributes, which are " +
    +                    "contained in every FlowFile, should be included in 
the final CSV value generated.  The Attribute List property " +
    +                    "overrides this setting.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = 
new PropertyDescriptor.Builder()
    +            .name(("null-value"))
    +            .displayName("Null Value")
    +            .description("If true a non existing or empty attribute will 
be 'null' in the resulting CSV. If false an empty " +
    +                    "string will be placed in the CSV")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
    +            .description("Successfully converted attributes to 
CSV").build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
    +            .description("Failed to convert attributes to CSV").build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private volatile Set<String> userSpecifiedAttributes;
    +    private volatile Boolean includeCoreAttributes;
    +    private volatile Set<String> coreAttributes;
    +    private volatile boolean destinationContent;
    +    private volatile boolean nullValForEmptyString;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(ATTRIBUTES_LIST);
    +        properties.add(DESTINATION);
    +        properties.add(INCLUDE_CORE_ATTRIBUTES);
    +        properties.add(NULL_VALUE_FOR_EMPTY_STRING);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    private Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, 
Set<String> userSpecifiedAttributes) {
    +        Map<String, String> result;
    +        if (!userSpecifiedAttributes.isEmpty()) {
    +            //the user gave a list of attributes
    +            result = new HashMap<>(userSpecifiedAttributes.size());
    +            for (String attribute : userSpecifiedAttributes) {
    +                String val = ff.getAttribute(attribute);
    +                if (val != null && !val.isEmpty()) {
    +                    result.put(attribute, val);
    +                } else {
    +                    if (nullValForEmptyString) {
    +                        result.put(attribute, "null");
    +                    } else {
    +                        result.put(attribute, "");
    +                    }
    +                }
    +            }
    +        } else {
    +            //the user did not give a list of attributes, take all the 
attributes from the flowfile
    +            Map<String, String> ffAttributes = ff.getAttributes();
    +            result = new HashMap<>(ffAttributes.size());
    +            for (Map.Entry<String, String> e : ffAttributes.entrySet()) {
    +                    result.put(e.getKey(), e.getValue());
    +            }
    +        }
    +
    +        //now glue on the core attributes if the user wants them.
    +        if(includeCoreAttributes) {
    +            for (String coreAttribute : coreAttributes) {
    +                String val = ff.getAttribute(coreAttribute);
    +                //make sure this coreAttribute is applicable to this 
flowfile.
    +                if(ff.getAttributes().containsKey(coreAttribute)) {
    +                    if (val != null && !val.isEmpty()) {
    +                        result.put(coreAttribute, val);
    +                    } else {
    +                        if (nullValForEmptyString) {
    +                            result.put(coreAttribute, "null");
    +                        } else {
    +                            result.put(coreAttribute, "");
    +                        }
    +                    }
    +                }
    +            }
    +        } else {
    +            //remove core attributes since the user does not want them, 
unless they are in the attribute list. Attribute List always wins
    +            for (String coreAttribute : coreAttributes) {
    +                //never override user specified attributes, even if the 
user has selected to exclude core attributes
    +                if(!userSpecifiedAttributes.contains(coreAttribute)) {
    +                    result.remove(coreAttribute);
    +                }
    +            }
    +        }
    +        return result;
    +    }
    +
    +    private Set<String> attributeListStringToSet(String attributeList) {
    +        //take the user specified attribute list string and convert to 
list of strings.
    +        Set<String> result = new HashSet<>();
    +        if (StringUtils.isNotBlank(attributeList)) {
    +            String[] ats = StringUtils.split(attributeList, 
OUTPUT_SEPARATOR);
    --- End diff --
    
    Another edge case (sorry), attribute names are allowed to have commas in 
them :( I'm totally fine with not supporting that kind of thing, but maybe add 
to the description of Attribute List that attribute names containing commas are 
not allowed. Or if you want to support them, then Attribute List would itself 
be in CSV format and would have to be parsed as such.  I'm good either way, 
just pointing it out for completeness.


> AttributesToCSV
> ---------------
>
>                 Key: NIFI-1705
>                 URL: https://issues.apache.org/jira/browse/NIFI-1705
>             Project: Apache NiFi
>          Issue Type: Sub-task
>          Components: Extensions
>            Reporter: Randy Gelhausen
>
> Create a new processor which converts a Flowfile's attributes into CSV 
> content.
> Should support the same configuration options as the AttributesToJSON 
> processor



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to