[
https://issues.apache.org/jira/browse/NIFI-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15375849#comment-15375849
]
ASF GitHub Bot commented on NIFI-1942:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/476#discussion_r70716478
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
---
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBigDecimal;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.constraint.DMinMax;
+import org.supercsv.cellprocessor.constraint.Equals;
+import org.supercsv.cellprocessor.constraint.ForbidSubStr;
+import org.supercsv.cellprocessor.constraint.LMinMax;
+import org.supercsv.cellprocessor.constraint.NotNull;
+import org.supercsv.cellprocessor.constraint.RequireHashCode;
+import org.supercsv.cellprocessor.constraint.RequireSubStr;
+import org.supercsv.cellprocessor.constraint.StrMinMax;
+import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
+import org.supercsv.cellprocessor.constraint.StrRegEx;
+import org.supercsv.cellprocessor.constraint.Strlen;
+import org.supercsv.cellprocessor.constraint.Unique;
+import org.supercsv.cellprocessor.constraint.UniqueHashCode;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"csv", "schema", "validation"})
+@CapabilityDescription("Validates the contents of FlowFiles against a
user-specified CSV schema")
+public class ValidateCsv extends AbstractProcessor {
+
+ private final static List<String> allowedOperators =
Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
+ "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax",
"Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
+ "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax",
"StrNotNullOrEmpty", "StrRegEx", "Unique",
+ "UniqueHashCode");
+
+ public static final PropertyDescriptor SCHEMA = new
PropertyDescriptor.Builder()
+ .name("validate-csv-schema")
+ .displayName("Schema")
+ .description("The schema to be used for validation. Is
expected a comma-delimited string representing" +
+ "the cell processors to apply. The following cell
processors are allowed in the schema definition: " +
+ allowedOperators.toString() + ". Note: cell processors
cannot be nested except with Optional.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor HEADER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-header")
+ .displayName("Header")
+ .description("True if the incoming flow file contains a header
to ignore, false otherwise.")
+ .required(true)
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor QUOTE_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-quote")
+ .displayName("Quote character")
+ .description("Character used as 'quote' in the incoming data.
Example: \"")
+ .required(true)
+ .defaultValue("\"")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DELIMITER_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-delimiter")
+ .displayName("Delimiter character")
+ .description("Character used as 'delimiter' in the incoming
data. Example: ;")
+ .required(true)
+ .defaultValue(";")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor END_OF_LINE_CHARACTER = new
PropertyDescriptor.Builder()
+ .name("validate-csv-eol")
+ .displayName("End of line symbols")
+ .description("Symbols used as 'end of line' in the incoming
data. Example: \\n")
+ .required(true)
+ .defaultValue("\\n")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_VALID = new Relationship.Builder()
+ .name("valid")
+ .description("FlowFiles that are successfully validated
against the schema are routed to this relationship")
+ .build();
+ public static final Relationship REL_INVALID = new
Relationship.Builder()
+ .name("invalid")
+ .description("FlowFiles that are not valid according to the
specified schema are routed to this relationship")
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private final AtomicReference<List<CellProcessor>> processors = new
AtomicReference<List<CellProcessor>>();
+ private final AtomicReference<CsvPreference> preference = new
AtomicReference<CsvPreference>();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(SCHEMA);
+ properties.add(HEADER);
+ properties.add(DELIMITER_CHARACTER);
+ properties.add(QUOTE_CHARACTER);
+ properties.add(END_OF_LINE_CHARACTER);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_VALID);
+ relationships.add(REL_INVALID);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ String schema = validationContext.getProperty(SCHEMA).getValue();
+ try {
+
this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
+ } catch (Exception e) {
+ final List<ValidationResult> problems = new ArrayList<>(1);
+ problems.add(new
ValidationResult.Builder().subject(SCHEMA.getName())
+ .input(schema)
+ .valid(false)
+ .explanation("Error while parsing the schema: " +
e.getMessage())
+ .build());
+ return problems;
+ }
+ return super.customValidate(validationContext);
+ }
+
+ @OnScheduled
+ public void setPreference(final ProcessContext context) {
+ this.preference.set(new
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
+
context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
+
context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
+ }
+
+ /**
+ * Method used to parse the string supplied by the user. The string is
converted
+ * to a list of cell processors used to validate the CSV data.
+ * @param schema Schema to parse
+ */
+ private void parseSchema(String schema) {
+ this.processors.set(new ArrayList<CellProcessor>());
+ String remaining = schema;
+ while(remaining.length() > 0) {
+ remaining = setProcessor(remaining);
+ }
+ }
+
+ private String setProcessor(String remaining) {
+ StringBuffer buffer = new StringBuffer();
+ int i = 0;
+ int opening = 0;
+ int closing = 0;
+ while(buffer.length() != remaining.length()) {
+ char c = remaining.charAt(i);
+ i++;
+
+ if(opening == 0 && c == ',') {
+ if(i == 1) {
+ continue;
+ }
+ break;
+ }
+
+ buffer.append(c);
+
+ if(c == '(') {
+ opening++;
+ } else if(c == ')') {
+ closing++;
+ }
+
+ if(opening > 0 && opening == closing) {
+ break;
+ }
+ }
+
+ final String procString = buffer.toString().trim();
+ opening = procString.indexOf('(');
+ String method = procString;
+ String argument = null;
+ if(opening != -1) {
+ argument = method.substring(opening + 1, method.length() - 1);
+ method = method.substring(0, opening);
+ }
+
+ this.processors.get().add(getProcessor(method.toLowerCase(),
argument));
+
+ return remaining.substring(i);
+ }
+
+ private CellProcessor getProcessor(String method, String argument) {
+ switch (method) {
+
+ case "optional":
+ int opening = argument.indexOf('(');
+ String subMethod = argument;
+ String subArgument = null;
+ if(opening != -1) {
+ subArgument = subMethod.substring(opening + 1,
subMethod.length() - 1);
+ subMethod = subMethod.substring(0, opening);
+ }
+ return new Optional(getProcessor(subMethod.toLowerCase(),
subArgument));
+
+ case "parsedate":
+ return new ParseDate(argument.substring(1, argument.length() -
1));
+
+ case "parsedouble":
+ return new ParseDouble();
+
+ case "parsebigdecimal":
+ return new ParseBigDecimal();
+
+ case "parsebool":
+ return new ParseBool();
+
+ case "parsechar":
+ return new ParseChar();
+
+ case "parseint":
+ return new ParseInt();
+
+ case "parselong":
+ return new ParseLong();
+
+ case "notnull":
+ return new NotNull();
+
+ case "strregex":
+ return new StrRegEx(argument.substring(1, argument.length() -
1));
+
+ case "unique":
+ return new Unique();
+
+ case "uniquehashcode":
+ return new UniqueHashCode();
+
+ case "strlen":
+ String[] splts = argument.split(",");
+ int[] requiredLengths = new int[splts.length];
+ for(int i = 0; i < splts.length; i++) {
+ requiredLengths[i] = Integer.parseInt(splts[i]);
+ }
+ return new Strlen(requiredLengths);
+
+ case "strminmax":
+ String[] splits = argument.split(",");
+ return new StrMinMax(Long.parseLong(splits[0]),
Long.parseLong(splits[1]));
+
+ case "lminmax":
+ String[] args = argument.split(",");
+ return new LMinMax(Long.parseLong(args[0]),
Long.parseLong(args[1]));
+
+ case "dminmax":
+ String[] doubles = argument.split(",");
+ return new DMinMax(Double.parseDouble(doubles[0]),
Double.parseDouble(doubles[1]));
+
+ case "equals":
+ return new Equals();
+
+ case "forbidsubstr":
+ String[] forbiddenSubStrings = argument.split(",");
+ for (int i = 0; i < forbiddenSubStrings.length; i++) {
+ forbiddenSubStrings[i] =
forbiddenSubStrings[i].substring(1, forbiddenSubStrings[i].length() - 1);
+ }
+ return new ForbidSubStr(forbiddenSubStrings);
+
+ case "requiresubstr":
+ String[] requiredSubStrings = argument.split(",");
+ for (int i = 0; i < requiredSubStrings.length; i++) {
+ requiredSubStrings[i] = requiredSubStrings[i].substring(1,
requiredSubStrings[i].length() - 1);
+ }
+ return new RequireSubStr(requiredSubStrings);
+
+ case "strnotnullorempty":
+ return new StrNotNullOrEmpty();
+
+ case "requirehashcode":
+ String[] hashs = argument.split(",");
+ int[] hashcodes = new int[hashs.length];
+ for(int i = 0; i < hashs.length; i++) {
+ hashcodes[i] = Integer.parseInt(hashs[i]);
+ }
+ return new RequireHashCode(hashcodes);
+
+ case "null":
+ return null;
+
+ default:
+ throw new IllegalArgumentException(method + " is not an
allowed method to define a Cell Processor");
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) {
+ final List<FlowFile> flowFiles = session.get(50);
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ final CsvPreference csvPref = this.preference.get();
+ final boolean header = context.getProperty(HEADER).asBoolean();
+ final ComponentLog logger = getLogger();
+ final CellProcessor[] cellProcs =
this.processors.get().toArray(new CellProcessor[this.processors.get().size()]);
--- End diff --
Can this be done in the onScheduled? I feel there is decent amount of
allocating and processing that could be re-used.
> Create a processor to validate CSV against a user-supplied schema
> -----------------------------------------------------------------
>
> Key: NIFI-1942
> URL: https://issues.apache.org/jira/browse/NIFI-1942
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Pierre Villard
> Assignee: Pierre Villard
> Priority: Minor
> Fix For: 1.0.0
>
>
> In order to extend the set of "quality control" processors, it would be
> interesting to have a processor validating CSV formatted flow files against a
> user-specified schema.
> Flow file validated against schema would be routed to "valid" relationship
> although flow file not validated against schema would be routed to "invalid"
> relationship.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)