Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/476#discussion_r72543424
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
---
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBigDecimal;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.constraint.DMinMax;
+import org.supercsv.cellprocessor.constraint.Equals;
+import org.supercsv.cellprocessor.constraint.ForbidSubStr;
+import org.supercsv.cellprocessor.constraint.IsIncludedIn;
+import org.supercsv.cellprocessor.constraint.LMinMax;
+import org.supercsv.cellprocessor.constraint.NotNull;
+import org.supercsv.cellprocessor.constraint.RequireHashCode;
+import org.supercsv.cellprocessor.constraint.RequireSubStr;
+import org.supercsv.cellprocessor.constraint.StrMinMax;
+import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
+import org.supercsv.cellprocessor.constraint.StrRegEx;
+import org.supercsv.cellprocessor.constraint.Strlen;
+import org.supercsv.cellprocessor.constraint.Unique;
+import org.supercsv.cellprocessor.constraint.UniqueHashCode;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"csv", "schema", "validation"})
+@CapabilityDescription("Validates the contents of FlowFiles against a
user-specified CSV schema. " +
+ "Take a look at the additional documentation of this processor for
some schema examples.")
+public class ValidateCsv extends AbstractProcessor {
+
+ private final static List<String> allowedOperators =
Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
+ "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax",
"Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
+ "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax",
"StrNotNullOrEmpty", "StrRegEx", "Unique",
+ "UniqueHashCode", "IsIncludedIn");
+
+ private static final String routeWholeFlowFile = "Route to 'valid' the
whole FlowFile if the CSV is valid";
+ private static final String routeLinesIndividually = "Route to 'valid'
a FlowFile containing the valid lines and"
+ + " to 'invalid' a FlowFile containing the invalid lines";
+
+ public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new
AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
+ "In case an error is found in the CSV file, the whole flow
file will be routed to the 'invalid' relationship. "
+ + "This option offers best performances.");
+
+ public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new
AllowableValue(routeLinesIndividually, routeLinesIndividually,
+ "In case an error is found, the input CSV file will be split
into two FlowFiles: one routed to the 'valid' "
+ + "relationship containing all the correct lines and
one routed to the 'invalid' relationship containing all "
+ + "the incorrect lines. Take care if choosing this
option while using Unique cell processors in schema definition.");
+
+ public static final PropertyDescriptor SCHEMA = new
PropertyDescriptor.Builder()
+ .name("validate-csv-schema")
+ .displayName("Schema")
+ .description("The schema to be used for validation. Is
expected a comma-delimited string representing the cell "
+ + "processors to apply. The following cell processors
are allowed in the schema definition: "
+ + allowedOperators.toString() + ". Note: cell
processors cannot be nested except with Optional.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor HEADER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-header")
+ .displayName("Header")
+ .description("True if the incoming flow file contains a header
to ignore, false otherwise.")
+ .required(true)
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor QUOTE_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-quote")
+ .displayName("Quote character")
+ .description("Character used as 'quote' in the incoming data.
Example: \"")
+ .required(true)
+ .defaultValue("\"")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DELIMITER_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-delimiter")
+ .displayName("Delimiter character")
+ .description("Character used as 'delimiter' in the incoming
data. Example: ,")
+ .required(true)
+ .defaultValue(",")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor END_OF_LINE_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-eol")
+ .displayName("End of line symbols")
+ .description("Symbols used as 'end of line' in the incoming
data. Example: \\n")
+ .required(true)
+ .defaultValue("\\n")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor VALIDATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("validate-csv-strategy")
+ .displayName("Validation strategy")
+ .description("Strategy to apply when routing input files to
output relationships.")
+ .required(true)
+ .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
+ .allowableValues(VALIDATE_LINES_INDIVIDUALLY,
VALIDATE_WHOLE_FLOWFILE)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_VALID = new Relationship.Builder()
+ .name("valid")
+ .description("FlowFiles that are successfully validated
against the schema are routed to this relationship")
+ .build();
+ public static final Relationship REL_INVALID = new
Relationship.Builder()
+ .name("invalid")
+ .description("FlowFiles that are not valid according to the
specified schema are routed to this relationship")
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private final AtomicReference<CellProcessor[]> processors = new
AtomicReference<CellProcessor[]>();
+ private final AtomicReference<CsvPreference> preference = new
AtomicReference<CsvPreference>();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(SCHEMA);
+ properties.add(HEADER);
+ properties.add(DELIMITER_CHARACTER);
+ properties.add(QUOTE_CHARACTER);
+ properties.add(END_OF_LINE_CHARACTER);
+ properties.add(VALIDATION_STRATEGY);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_VALID);
+ relationships.add(REL_INVALID);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ String schema = validationContext.getProperty(SCHEMA).getValue();
+ try {
+
this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
+ } catch (Exception e) {
+ final List<ValidationResult> problems = new ArrayList<>(1);
+ problems.add(new
ValidationResult.Builder().subject(SCHEMA.getName())
+ .input(schema)
+ .valid(false)
+ .explanation("Error while parsing the schema: " +
e.getMessage())
+ .build());
+ return problems;
+ }
+ return super.customValidate(validationContext);
+ }
+
+ @OnScheduled
+ public void setPreference(final ProcessContext context) {
+ this.preference.set(new
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
+
context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
+
context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
+ }
+
+ /**
+ * Method used to parse the string supplied by the user. The string is
converted
+ * to a list of cell processors used to validate the CSV data.
+ * @param schema Schema to parse
+ */
+ private void parseSchema(String schema) {
+ List<CellProcessor> processorsList = new
ArrayList<CellProcessor>();
+
+ String remaining = schema;
+ while(remaining.length() > 0) {
+ remaining = setProcessor(remaining, processorsList);
+ }
+
+ this.processors.set(processorsList.toArray(new
CellProcessor[processorsList.size()]));
+ }
+
+ private String setProcessor(String remaining, List<CellProcessor>
processorsList) {
+ StringBuffer buffer = new StringBuffer();
+ int i = 0;
+ int opening = 0;
+ int closing = 0;
+ while(buffer.length() != remaining.length()) {
+ char c = remaining.charAt(i);
+ i++;
+
+ if(opening == 0 && c == ',') {
+ if(i == 1) {
+ continue;
+ }
+ break;
+ }
+
+ buffer.append(c);
+
+ if(c == '(') {
+ opening++;
+ } else if(c == ')') {
+ closing++;
+ }
+
+ if(opening > 0 && opening == closing) {
+ break;
+ }
+ }
+
+ final String procString = buffer.toString().trim();
+ opening = procString.indexOf('(');
+ String method = procString;
+ String argument = null;
+ if(opening != -1) {
+ argument = method.substring(opening + 1, method.length() - 1);
+ method = method.substring(0, opening);
+ }
+
+ processorsList.add(getProcessor(method.toLowerCase(), argument));
+
+ return remaining.substring(i);
+ }
+
+ private CellProcessor getProcessor(String method, String argument) {
+ switch (method) {
+
+ case "optional":
+ int opening = argument.indexOf('(');
+ String subMethod = argument;
+ String subArgument = null;
+ if(opening != -1) {
+ subArgument = subMethod.substring(opening + 1,
subMethod.length() - 1);
+ subMethod = subMethod.substring(0, opening);
+ }
+ return new Optional(getProcessor(subMethod.toLowerCase(),
subArgument));
+
+ case "parsedate":
+ return new ParseDate(argument.substring(1,
argument.length() - 1));
+
+ case "parsedouble":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseDouble does
not expect any argument but has " + argument);
+ return new ParseDouble();
+
+ case "parsebigdecimal":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseBigDecimal
does not expect any argument but has " + argument);
+ return new ParseBigDecimal();
+
+ case "parsebool":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseBool does not
expect any argument but has " + argument);
+ return new ParseBool();
+
+ case "parsechar":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseChar does not
expect any argument but has " + argument);
+ return new ParseChar();
+
+ case "parseint":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseInt does not
expect any argument but has " + argument);
+ return new ParseInt();
+
+ case "parselong":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseLong does not
expect any argument but has " + argument);
+ return new ParseLong();
+
+ case "notnull":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("NotNull does not
expect any argument but has " + argument);
+ return new NotNull();
+
+ case "strregex":
+ return new StrRegEx(argument.substring(1,
argument.length() - 1));
+
+ case "unique":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Unique does not
expect any argument but has " + argument);
+ return new Unique();
+
+ case "uniquehashcode":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("UniqueHashCode
does not expect any argument but has " + argument);
+ return new UniqueHashCode();
+
+ case "strlen":
+ String[] splts = argument.split(",");
+ int[] requiredLengths = new int[splts.length];
+ for(int i = 0; i < splts.length; i++) {
+ requiredLengths[i] = Integer.parseInt(splts[i]);
+ }
+ return new Strlen(requiredLengths);
+
+ case "strminmax":
+ String[] splits = argument.split(",");
+ return new StrMinMax(Long.parseLong(splits[0]),
Long.parseLong(splits[1]));
+
+ case "lminmax":
+ String[] args = argument.split(",");
+ return new LMinMax(Long.parseLong(args[0]),
Long.parseLong(args[1]));
+
+ case "dminmax":
+ String[] doubles = argument.split(",");
+ return new DMinMax(Double.parseDouble(doubles[0]),
Double.parseDouble(doubles[1]));
+
+ case "equals":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Equals does not
expect any argument but has " + argument);
+ return new Equals();
+
+ case "forbidsubstr":
+ String[] forbiddenSubStrings = argument.replaceAll("\"",
"").split(",[ ]*");
+ return new ForbidSubStr(forbiddenSubStrings);
+
+ case "requiresubstr":
+ String[] requiredSubStrings = argument.replaceAll("\"",
"").split(",[ ]*");
+ return new RequireSubStr(requiredSubStrings);
+
+ case "strnotnullorempty":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("StrNotNullOrEmpty
does not expect any argument but has " + argument);
+ return new StrNotNullOrEmpty();
+
+ case "requirehashcode":
+ String[] hashs = argument.split(",");
+ int[] hashcodes = new int[hashs.length];
+ for(int i = 0; i < hashs.length; i++) {
+ hashcodes[i] = Integer.parseInt(hashs[i]);
+ }
+ return new RequireHashCode(hashcodes);
+
+ case "null":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Null does not
expect any argument but has " + argument);
+ return null;
+
+ case "isincludedin":
+ String[] elements = argument.replaceAll("\"",
"").split(",[ ]*");
+ return new IsIncludedIn(elements);
+
+ default:
+ throw new IllegalArgumentException("[" + method + "] is
not an allowed method to define a Cell Processor");
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final CsvPreference csvPref = this.preference.get();
+ final boolean header = context.getProperty(HEADER).asBoolean();
+ final ComponentLog logger = getLogger();
+ final CellProcessor[] cellProcs = this.processors.get();
+ final boolean isWholeFFValidation =
context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
+
+ final AtomicReference<Boolean> valid = new
AtomicReference<Boolean>(true);
+ final AtomicReference<Boolean> isFirstLine = new
AtomicReference<Boolean>(true);
+ final AtomicReference<Integer> okCount = new
AtomicReference<Integer>(0);
+ final AtomicReference<Integer> totalCount = new
AtomicReference<Integer>(0);
+ final AtomicReference<FlowFile> invalidFF = new
AtomicReference<FlowFile>(null);
+ final AtomicReference<FlowFile> validFF = new
AtomicReference<FlowFile>(null);
+
+ if(!isWholeFFValidation) {
+ invalidFF.set(session.create(flowFile));
+ validFF.set(session.create(flowFile));
+ }
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ NifiCsvListReader listReader = null;
+ try {
+ listReader = new NifiCsvListReader(new
InputStreamReader(in), csvPref);
+
+ // handling of header
+ if(header) {
+ List<String> headerList = listReader.read();
+ if(!isWholeFFValidation) {
+ invalidFF.set(session.append(invalidFF.get(),
new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out)
throws IOException {
+ out.write(print(headerList, csvPref,
isFirstLine.get()));
+ }
+ }));
+ validFF.set(session.append(validFF.get(), new
OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out)
throws IOException {
+ out.write(print(headerList, csvPref,
isFirstLine.get()));
+ }
+ }));
+ isFirstLine.set(false);
+ }
+ }
+
+ boolean stop = false;
+
+ while (!stop) {
+ try {
+
+ final List<Object> list =
listReader.read(cellProcs);
+ stop = list == null;
+
+ if(!isWholeFFValidation && !stop) {
+ validFF.set(session.append(validFF.get(),
new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out)
throws IOException {
+ out.write(print(list, csvPref,
isFirstLine.get()));
+ }
+ }));
+ okCount.set(okCount.get() + 1);
+
+ if(isFirstLine.get()) {
+ isFirstLine.set(false);
+ }
+ }
+
+ } catch (final SuperCsvCellProcessorException e) {
--- End diff --
This should probably also catch "SuperCsvException". I ran into one when
the number of columns I had as my processor list didn't match the number passed.

---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---