[ 
https://issues.apache.org/jira/browse/NIFI-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381503#comment-15381503
 ] 

ASF GitHub Bot commented on NIFI-1942:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r71088596
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
 ---
    @@ -0,0 +1,433 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a 
user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for 
some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = 
Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", 
"Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", 
"StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is 
expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell 
processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors 
cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header 
to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. 
Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming 
data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming 
data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated 
against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new 
Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the 
specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new 
AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new 
AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            
this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new 
ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + 
e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new 
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                
context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                
context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is 
converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new 
ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new 
CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> 
processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, 
subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), 
subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, 
argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does 
not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal 
does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not 
expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not 
expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not 
expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not 
expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not 
expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, 
argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not 
expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode 
does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), 
Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), 
Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), 
Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not 
expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.split(",");
    +                for (int i = 0; i < forbiddenSubStrings.length; i++) {
    +                    forbiddenSubStrings[i] = 
forbiddenSubStrings[i].substring(1, forbiddenSubStrings[i].length() - 1);
    +                }
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.split(",");
    +                for (int i = 0; i < requiredSubStrings.length; i++) {
    +                    requiredSubStrings[i] = 
requiredSubStrings[i].substring(1, requiredSubStrings[i].length() - 1);
    +                }
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty 
does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not 
expect any argument but has " + argument);
    +                return null;
    +
    +            default:
    +                throw new IllegalArgumentException(method + " is not an 
allowed method to define a Cell Processor");
    --- End diff --
    
    This should quotes around "method". If a user has two commas in a row it 
can be quite confusing. 
    
    Also an explicit check if "method" is empty (suggesting the user has two 
commas in a row) would be nice.


> Create a processor to validate CSV against a user-supplied schema
> -----------------------------------------------------------------
>
>                 Key: NIFI-1942
>                 URL: https://issues.apache.org/jira/browse/NIFI-1942
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Pierre Villard
>            Assignee: Pierre Villard
>            Priority: Minor
>             Fix For: 1.0.0
>
>         Attachments: ValidateCSV.xml
>
>
> In order to extend the set of "quality control" processors, it would be 
> interesting to have a processor validating CSV formatted flow files against a 
> user-specified schema.
> Flow file validated against schema would be routed to "valid" relationship 
> although flow file not validated against schema would be routed to "invalid" 
> relationship.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to