[ https://issues.apache.org/jira/browse/NIFI-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381503#comment-15381503 ]
ASF GitHub Bot commented on NIFI-1942: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/476#discussion_r71088596 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java --- @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.supercsv.cellprocessor.Optional; +import org.supercsv.cellprocessor.ParseBigDecimal; +import org.supercsv.cellprocessor.ParseBool; +import org.supercsv.cellprocessor.ParseChar; +import org.supercsv.cellprocessor.ParseDate; +import org.supercsv.cellprocessor.ParseDouble; +import org.supercsv.cellprocessor.ParseInt; +import org.supercsv.cellprocessor.ParseLong; +import org.supercsv.cellprocessor.constraint.DMinMax; +import org.supercsv.cellprocessor.constraint.Equals; +import org.supercsv.cellprocessor.constraint.ForbidSubStr; +import org.supercsv.cellprocessor.constraint.LMinMax; +import org.supercsv.cellprocessor.constraint.NotNull; +import org.supercsv.cellprocessor.constraint.RequireHashCode; +import org.supercsv.cellprocessor.constraint.RequireSubStr; +import org.supercsv.cellprocessor.constraint.StrMinMax; +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty; +import org.supercsv.cellprocessor.constraint.StrRegEx; +import org.supercsv.cellprocessor.constraint.Strlen; +import org.supercsv.cellprocessor.constraint.Unique; +import org.supercsv.cellprocessor.constraint.UniqueHashCode; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.exception.SuperCsvCellProcessorException; +import org.supercsv.io.CsvListReader; +import org.supercsv.prefs.CsvPreference; + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"csv", "schema", "validation"}) +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " + + "Take a look at the additional documentation of this processor for some schema examples.") +public class ValidateCsv extends AbstractProcessor { + + private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate", + "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null", + "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique", + "UniqueHashCode"); + + public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .name("validate-csv-schema") + .displayName("Schema") + .description("The schema to be used for validation. Is expected a comma-delimited string representing" + + "the cell processors to apply. The following cell processors are allowed in the schema definition: " + + allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder() + .name("validate-csv-header") + .displayName("Header") + .description("True if the incoming flow file contains a header to ignore, false otherwise.") + .required(true) + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder() + .name("validate-csv-quote") + .displayName("Quote character") + .description("Character used as 'quote' in the incoming data. Example: \"") + .required(true) + .defaultValue("\"") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder() + .name("validate-csv-delimiter") + .displayName("Delimiter character") + .description("Character used as 'delimiter' in the incoming data. Example: ,") + .required(true) + .defaultValue(",") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder() + .name("validate-csv-eol") + .displayName("End of line symbols") + .description("Symbols used as 'end of line' in the incoming data. Example: \\n") + .required(true) + .defaultValue("\\n") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_VALID = new Relationship.Builder() + .name("valid") + .description("FlowFiles that are successfully validated against the schema are routed to this relationship") + .build(); + public static final Relationship REL_INVALID = new Relationship.Builder() + .name("invalid") + .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>(); + private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SCHEMA); + properties.add(HEADER); + properties.add(DELIMITER_CHARACTER); + properties.add(QUOTE_CHARACTER); + properties.add(END_OF_LINE_CHARACTER); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_VALID); + relationships.add(REL_INVALID); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + String schema = validationContext.getProperty(SCHEMA).getValue(); + try { + this.parseSchema(validationContext.getProperty(SCHEMA).getValue()); + } catch (Exception e) { + final List<ValidationResult> problems = new ArrayList<>(1); + problems.add(new ValidationResult.Builder().subject(SCHEMA.getName()) + .input(schema) + .valid(false) + .explanation("Error while parsing the schema: " + e.getMessage()) + .build()); + return problems; + } + return super.customValidate(validationContext); + } + + @OnScheduled + public void setPreference(final ProcessContext context) { + this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0), + context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0), + context.getProperty(END_OF_LINE_CHARACTER).getValue()).build()); + } + + /** + * Method used to parse the string supplied by the user. The string is converted + * to a list of cell processors used to validate the CSV data. + * @param schema Schema to parse + */ + private void parseSchema(String schema) { + List<CellProcessor> processorsList = new ArrayList<CellProcessor>(); + + String remaining = schema; + while(remaining.length() > 0) { + remaining = setProcessor(remaining, processorsList); + } + + this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()])); + } + + private String setProcessor(String remaining, List<CellProcessor> processorsList) { + StringBuffer buffer = new StringBuffer(); + int i = 0; + int opening = 0; + int closing = 0; + while(buffer.length() != remaining.length()) { + char c = remaining.charAt(i); + i++; + + if(opening == 0 && c == ',') { + if(i == 1) { + continue; + } + break; + } + + buffer.append(c); + + if(c == '(') { + opening++; + } else if(c == ')') { + closing++; + } + + if(opening > 0 && opening == closing) { + break; + } + } + + final String procString = buffer.toString().trim(); + opening = procString.indexOf('('); + String method = procString; + String argument = null; + if(opening != -1) { + argument = method.substring(opening + 1, method.length() - 1); + method = method.substring(0, opening); + } + + processorsList.add(getProcessor(method.toLowerCase(), argument)); + + return remaining.substring(i); + } + + private CellProcessor getProcessor(String method, String argument) { + switch (method) { + + case "optional": + int opening = argument.indexOf('('); + String subMethod = argument; + String subArgument = null; + if(opening != -1) { + subArgument = subMethod.substring(opening + 1, subMethod.length() - 1); + subMethod = subMethod.substring(0, opening); + } + return new Optional(getProcessor(subMethod.toLowerCase(), subArgument)); + + case "parsedate": + return new ParseDate(argument.substring(1, argument.length() - 1)); + + case "parsedouble": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument); + return new ParseDouble(); + + case "parsebigdecimal": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument); + return new ParseBigDecimal(); + + case "parsebool": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument); + return new ParseBool(); + + case "parsechar": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument); + return new ParseChar(); + + case "parseint": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument); + return new ParseInt(); + + case "parselong": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument); + return new ParseLong(); + + case "notnull": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument); + return new NotNull(); + + case "strregex": + return new StrRegEx(argument.substring(1, argument.length() - 1)); + + case "unique": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("Unique does not expect any argument but has " + argument); + return new Unique(); + + case "uniquehashcode": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument); + return new UniqueHashCode(); + + case "strlen": + String[] splts = argument.split(","); + int[] requiredLengths = new int[splts.length]; + for(int i = 0; i < splts.length; i++) { + requiredLengths[i] = Integer.parseInt(splts[i]); + } + return new Strlen(requiredLengths); + + case "strminmax": + String[] splits = argument.split(","); + return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1])); + + case "lminmax": + String[] args = argument.split(","); + return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1])); + + case "dminmax": + String[] doubles = argument.split(","); + return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1])); + + case "equals": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("Equals does not expect any argument but has " + argument); + return new Equals(); + + case "forbidsubstr": + String[] forbiddenSubStrings = argument.split(","); + for (int i = 0; i < forbiddenSubStrings.length; i++) { + forbiddenSubStrings[i] = forbiddenSubStrings[i].substring(1, forbiddenSubStrings[i].length() - 1); + } + return new ForbidSubStr(forbiddenSubStrings); + + case "requiresubstr": + String[] requiredSubStrings = argument.split(","); + for (int i = 0; i < requiredSubStrings.length; i++) { + requiredSubStrings[i] = requiredSubStrings[i].substring(1, requiredSubStrings[i].length() - 1); + } + return new RequireSubStr(requiredSubStrings); + + case "strnotnullorempty": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument); + return new StrNotNullOrEmpty(); + + case "requirehashcode": + String[] hashs = argument.split(","); + int[] hashcodes = new int[hashs.length]; + for(int i = 0; i < hashs.length; i++) { + hashcodes[i] = Integer.parseInt(hashs[i]); + } + return new RequireHashCode(hashcodes); + + case "null": + if(argument != null && !argument.isEmpty()) + throw new IllegalArgumentException("Null does not expect any argument but has " + argument); + return null; + + default: + throw new IllegalArgumentException(method + " is not an allowed method to define a Cell Processor"); --- End diff -- This should quotes around "method". If a user has two commas in a row it can be quite confusing. Also an explicit check if "method" is empty (suggesting the user has two commas in a row) would be nice. > Create a processor to validate CSV against a user-supplied schema > ----------------------------------------------------------------- > > Key: NIFI-1942 > URL: https://issues.apache.org/jira/browse/NIFI-1942 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Reporter: Pierre Villard > Assignee: Pierre Villard > Priority: Minor > Fix For: 1.0.0 > > Attachments: ValidateCSV.xml > > > In order to extend the set of "quality control" processors, it would be > interesting to have a processor validating CSV formatted flow files against a > user-specified schema. > Flow file validated against schema would be routed to "valid" relationship > although flow file not validated against schema would be routed to "invalid" > relationship. -- This message was sent by Atlassian JIRA (v6.3.4#6332)