Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/476#discussion_r71088596
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
---
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBigDecimal;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.constraint.DMinMax;
+import org.supercsv.cellprocessor.constraint.Equals;
+import org.supercsv.cellprocessor.constraint.ForbidSubStr;
+import org.supercsv.cellprocessor.constraint.LMinMax;
+import org.supercsv.cellprocessor.constraint.NotNull;
+import org.supercsv.cellprocessor.constraint.RequireHashCode;
+import org.supercsv.cellprocessor.constraint.RequireSubStr;
+import org.supercsv.cellprocessor.constraint.StrMinMax;
+import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
+import org.supercsv.cellprocessor.constraint.StrRegEx;
+import org.supercsv.cellprocessor.constraint.Strlen;
+import org.supercsv.cellprocessor.constraint.Unique;
+import org.supercsv.cellprocessor.constraint.UniqueHashCode;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"csv", "schema", "validation"})
+@CapabilityDescription("Validates the contents of FlowFiles against a
user-specified CSV schema. " +
+ "Take a look at the additional documentation of this processor for
some schema examples.")
+public class ValidateCsv extends AbstractProcessor {
+
+ private final static List<String> allowedOperators =
Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
+ "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax",
"Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
+ "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax",
"StrNotNullOrEmpty", "StrRegEx", "Unique",
+ "UniqueHashCode");
+
+ public static final PropertyDescriptor SCHEMA = new
PropertyDescriptor.Builder()
+ .name("validate-csv-schema")
+ .displayName("Schema")
+ .description("The schema to be used for validation. Is
expected a comma-delimited string representing" +
+ "the cell processors to apply. The following cell
processors are allowed in the schema definition: " +
+ allowedOperators.toString() + ". Note: cell processors
cannot be nested except with Optional.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor HEADER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-header")
+ .displayName("Header")
+ .description("True if the incoming flow file contains a header
to ignore, false otherwise.")
+ .required(true)
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor QUOTE_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-quote")
+ .displayName("Quote character")
+ .description("Character used as 'quote' in the incoming data.
Example: \"")
+ .required(true)
+ .defaultValue("\"")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DELIMITER_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-delimiter")
+ .displayName("Delimiter character")
+ .description("Character used as 'delimiter' in the incoming
data. Example: ,")
+ .required(true)
+ .defaultValue(",")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor END_OF_LINE_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-eol")
+ .displayName("End of line symbols")
+ .description("Symbols used as 'end of line' in the incoming
data. Example: \\n")
+ .required(true)
+ .defaultValue("\\n")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_VALID = new Relationship.Builder()
+ .name("valid")
+ .description("FlowFiles that are successfully validated
against the schema are routed to this relationship")
+ .build();
+ public static final Relationship REL_INVALID = new
Relationship.Builder()
+ .name("invalid")
+ .description("FlowFiles that are not valid according to the
specified schema are routed to this relationship")
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private final AtomicReference<CellProcessor[]> processors = new
AtomicReference<CellProcessor[]>();
+ private final AtomicReference<CsvPreference> preference = new
AtomicReference<CsvPreference>();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(SCHEMA);
+ properties.add(HEADER);
+ properties.add(DELIMITER_CHARACTER);
+ properties.add(QUOTE_CHARACTER);
+ properties.add(END_OF_LINE_CHARACTER);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_VALID);
+ relationships.add(REL_INVALID);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ String schema = validationContext.getProperty(SCHEMA).getValue();
+ try {
+
this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
+ } catch (Exception e) {
+ final List<ValidationResult> problems = new ArrayList<>(1);
+ problems.add(new
ValidationResult.Builder().subject(SCHEMA.getName())
+ .input(schema)
+ .valid(false)
+ .explanation("Error while parsing the schema: " +
e.getMessage())
+ .build());
+ return problems;
+ }
+ return super.customValidate(validationContext);
+ }
+
+ @OnScheduled
+ public void setPreference(final ProcessContext context) {
+ this.preference.set(new
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
+
context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
+
context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
+ }
+
+ /**
+ * Method used to parse the string supplied by the user. The string is
converted
+ * to a list of cell processors used to validate the CSV data.
+ * @param schema Schema to parse
+ */
+ private void parseSchema(String schema) {
+ List<CellProcessor> processorsList = new
ArrayList<CellProcessor>();
+
+ String remaining = schema;
+ while(remaining.length() > 0) {
+ remaining = setProcessor(remaining, processorsList);
+ }
+
+ this.processors.set(processorsList.toArray(new
CellProcessor[processorsList.size()]));
+ }
+
+ private String setProcessor(String remaining, List<CellProcessor>
processorsList) {
+ StringBuffer buffer = new StringBuffer();
+ int i = 0;
+ int opening = 0;
+ int closing = 0;
+ while(buffer.length() != remaining.length()) {
+ char c = remaining.charAt(i);
+ i++;
+
+ if(opening == 0 && c == ',') {
+ if(i == 1) {
+ continue;
+ }
+ break;
+ }
+
+ buffer.append(c);
+
+ if(c == '(') {
+ opening++;
+ } else if(c == ')') {
+ closing++;
+ }
+
+ if(opening > 0 && opening == closing) {
+ break;
+ }
+ }
+
+ final String procString = buffer.toString().trim();
+ opening = procString.indexOf('(');
+ String method = procString;
+ String argument = null;
+ if(opening != -1) {
+ argument = method.substring(opening + 1, method.length() - 1);
+ method = method.substring(0, opening);
+ }
+
+ processorsList.add(getProcessor(method.toLowerCase(), argument));
+
+ return remaining.substring(i);
+ }
+
+ private CellProcessor getProcessor(String method, String argument) {
+ switch (method) {
+
+ case "optional":
+ int opening = argument.indexOf('(');
+ String subMethod = argument;
+ String subArgument = null;
+ if(opening != -1) {
+ subArgument = subMethod.substring(opening + 1,
subMethod.length() - 1);
+ subMethod = subMethod.substring(0, opening);
+ }
+ return new Optional(getProcessor(subMethod.toLowerCase(),
subArgument));
+
+ case "parsedate":
+ return new ParseDate(argument.substring(1,
argument.length() - 1));
+
+ case "parsedouble":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseDouble does
not expect any argument but has " + argument);
+ return new ParseDouble();
+
+ case "parsebigdecimal":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseBigDecimal
does not expect any argument but has " + argument);
+ return new ParseBigDecimal();
+
+ case "parsebool":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseBool does not
expect any argument but has " + argument);
+ return new ParseBool();
+
+ case "parsechar":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseChar does not
expect any argument but has " + argument);
+ return new ParseChar();
+
+ case "parseint":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseInt does not
expect any argument but has " + argument);
+ return new ParseInt();
+
+ case "parselong":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseLong does not
expect any argument but has " + argument);
+ return new ParseLong();
+
+ case "notnull":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("NotNull does not
expect any argument but has " + argument);
+ return new NotNull();
+
+ case "strregex":
+ return new StrRegEx(argument.substring(1,
argument.length() - 1));
+
+ case "unique":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Unique does not
expect any argument but has " + argument);
+ return new Unique();
+
+ case "uniquehashcode":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("UniqueHashCode
does not expect any argument but has " + argument);
+ return new UniqueHashCode();
+
+ case "strlen":
+ String[] splts = argument.split(",");
+ int[] requiredLengths = new int[splts.length];
+ for(int i = 0; i < splts.length; i++) {
+ requiredLengths[i] = Integer.parseInt(splts[i]);
+ }
+ return new Strlen(requiredLengths);
+
+ case "strminmax":
+ String[] splits = argument.split(",");
+ return new StrMinMax(Long.parseLong(splits[0]),
Long.parseLong(splits[1]));
+
+ case "lminmax":
+ String[] args = argument.split(",");
+ return new LMinMax(Long.parseLong(args[0]),
Long.parseLong(args[1]));
+
+ case "dminmax":
+ String[] doubles = argument.split(",");
+ return new DMinMax(Double.parseDouble(doubles[0]),
Double.parseDouble(doubles[1]));
+
+ case "equals":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Equals does not
expect any argument but has " + argument);
+ return new Equals();
+
+ case "forbidsubstr":
+ String[] forbiddenSubStrings = argument.split(",");
+ for (int i = 0; i < forbiddenSubStrings.length; i++) {
+ forbiddenSubStrings[i] =
forbiddenSubStrings[i].substring(1, forbiddenSubStrings[i].length() - 1);
+ }
+ return new ForbidSubStr(forbiddenSubStrings);
+
+ case "requiresubstr":
+ String[] requiredSubStrings = argument.split(",");
+ for (int i = 0; i < requiredSubStrings.length; i++) {
+ requiredSubStrings[i] =
requiredSubStrings[i].substring(1, requiredSubStrings[i].length() - 1);
+ }
+ return new RequireSubStr(requiredSubStrings);
+
+ case "strnotnullorempty":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("StrNotNullOrEmpty
does not expect any argument but has " + argument);
+ return new StrNotNullOrEmpty();
+
+ case "requirehashcode":
+ String[] hashs = argument.split(",");
+ int[] hashcodes = new int[hashs.length];
+ for(int i = 0; i < hashs.length; i++) {
+ hashcodes[i] = Integer.parseInt(hashs[i]);
+ }
+ return new RequireHashCode(hashcodes);
+
+ case "null":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Null does not
expect any argument but has " + argument);
+ return null;
+
+ default:
+ throw new IllegalArgumentException(method + " is not an
allowed method to define a Cell Processor");
--- End diff --
This should quotes around "method". If a user has two commas in a row it
can be quite confusing.
Also an explicit check if "method" is empty (suggesting the user has two
commas in a row) would be nice.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---