Repository: nifi
Updated Branches:
  refs/heads/master 022f5a506 -> d838f6129


NIFI-1942 Processor to validate CSV against user-supplied schema

This closes #476

Signed-off-by: jpercivall <joeperciv...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d838f612
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d838f612
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d838f612

Branch: refs/heads/master
Commit: d838f61291d2582592754a37314911b701c6891b
Parents: 022f5a5
Author: Pierre Villard <pierre.villard...@gmail.com>
Authored: Fri May 27 10:05:16 2016 +0200
Committer: jpercivall <joeperciv...@yahoo.com>
Committed: Fri Sep 16 11:13:07 2016 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   5 +
 .../nifi/processors/standard/ValidateCsv.java   | 618 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      | 109 ++++
 .../processors/standard/TestValidateCsv.java    | 255 ++++++++
 5 files changed, 988 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 0c27b62..6fbc4d9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -244,6 +244,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-standard-utils</artifactId>
             <version>1.1.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>net.sf.supercsv</groupId>
+            <artifactId>super-csv</artifactId>
+            <version>2.4.0</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
new file mode 100644
index 0000000..4788080
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBigDecimal;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.constraint.DMinMax;
+import org.supercsv.cellprocessor.constraint.Equals;
+import org.supercsv.cellprocessor.constraint.ForbidSubStr;
+import org.supercsv.cellprocessor.constraint.IsIncludedIn;
+import org.supercsv.cellprocessor.constraint.LMinMax;
+import org.supercsv.cellprocessor.constraint.NotNull;
+import org.supercsv.cellprocessor.constraint.RequireHashCode;
+import org.supercsv.cellprocessor.constraint.RequireSubStr;
+import org.supercsv.cellprocessor.constraint.StrMinMax;
+import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
+import org.supercsv.cellprocessor.constraint.StrRegEx;
+import org.supercsv.cellprocessor.constraint.Strlen;
+import org.supercsv.cellprocessor.constraint.Unique;
+import org.supercsv.cellprocessor.constraint.UniqueHashCode;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvException;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"csv", "schema", "validation"})
+@CapabilityDescription("Validates the contents of FlowFiles against a 
user-specified CSV schema. " +
+        "Take a look at the additional documentation of this processor for 
some schema examples.")
+public class ValidateCsv extends AbstractProcessor {
+
+    private final static List<String> allowedOperators = 
Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
+            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", 
"Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
+            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", 
"StrNotNullOrEmpty", "StrRegEx", "Unique",
+            "UniqueHashCode", "IsIncludedIn");
+
+    private static final String routeWholeFlowFile = "FlowFile validation";
+    private static final String routeLinesIndividually = "Line by line 
validation";
+
+    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new 
AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
+            "As soon as an error is found in the CSV file, the validation will 
stop and the whole flow file will be routed to the 'invalid'"
+                    + " relationship. This option offers best performances.");
+
+    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new 
AllowableValue(routeLinesIndividually, routeLinesIndividually,
+            "In case an error is found, the input CSV file will be split into 
two FlowFiles: one routed to the 'valid' "
+                    + "relationship containing all the correct lines and one 
routed to the 'invalid' relationship containing all "
+                    + "the incorrect lines. Take care if choosing this option 
while using Unique cell processors in schema definition:"
+                    + "the first occurrence will be considered valid and the 
next ones as invalid.");
+
+    public static final PropertyDescriptor SCHEMA = new 
PropertyDescriptor.Builder()
+            .name("validate-csv-schema")
+            .displayName("Schema")
+            .description("The schema to be used for validation. Is expected a 
comma-delimited string representing the cell "
+                    + "processors to apply. The following cell processors are 
allowed in the schema definition: "
+                    + allowedOperators.toString() + ". Note: cell processors 
cannot be nested except with Optional.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HEADER = new 
PropertyDescriptor.Builder()
+            .name("validate-csv-header")
+            .displayName("Header")
+            .description("True if the incoming flow file contains a header to 
ignore, false otherwise.")
+            .required(true)
+            .defaultValue("true")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor QUOTE_CHARACTER = new 
PropertyDescriptor.Builder()
+            .name("validate-csv-quote")
+            .displayName("Quote character")
+            .description("Character used as 'quote' in the incoming data. 
Example: \"")
+            .required(true)
+            .defaultValue("\"")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DELIMITER_CHARACTER = new 
PropertyDescriptor.Builder()
+            .name("validate-csv-delimiter")
+            .displayName("Delimiter character")
+            .description("Character used as 'delimiter' in the incoming data. 
Example: ,")
+            .required(true)
+            .defaultValue(",")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new 
PropertyDescriptor.Builder()
+            .name("validate-csv-eol")
+            .displayName("End of line symbols")
+            .description("Symbols used as 'end of line' in the incoming data. 
Example: \\n")
+            .required(true)
+            .defaultValue("\\n")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor VALIDATION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("validate-csv-strategy")
+            .displayName("Validation strategy")
+            .description("Strategy to apply when routing input files to output 
relationships.")
+            .required(true)
+            .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
+            .allowableValues(VALIDATE_LINES_INDIVIDUALLY, 
VALIDATE_WHOLE_FLOWFILE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_VALID = new Relationship.Builder()
+            .name("valid")
+            .description("FlowFiles that are successfully validated against 
the schema are routed to this relationship")
+            .build();
+    public static final Relationship REL_INVALID = new Relationship.Builder()
+            .name("invalid")
+            .description("FlowFiles that are not valid according to the 
specified schema are routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+    private final AtomicReference<CellProcessor[]> processors = new 
AtomicReference<CellProcessor[]>();
+    private final AtomicReference<CsvPreference> preference = new 
AtomicReference<CsvPreference>();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(SCHEMA);
+        properties.add(HEADER);
+        properties.add(DELIMITER_CHARACTER);
+        properties.add(QUOTE_CHARACTER);
+        properties.add(END_OF_LINE_CHARACTER);
+        properties.add(VALIDATION_STRATEGY);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_VALID);
+        relationships.add(REL_INVALID);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        String schema = validationContext.getProperty(SCHEMA).getValue();
+        try {
+            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
+        } catch (Exception e) {
+            final List<ValidationResult> problems = new ArrayList<>(1);
+            problems.add(new 
ValidationResult.Builder().subject(SCHEMA.getName())
+                    .input(schema)
+                    .valid(false)
+                    .explanation("Error while parsing the schema: " + 
e.getMessage())
+                    .build());
+            return problems;
+        }
+        return super.customValidate(validationContext);
+    }
+
+    @OnScheduled
+    public void setPreference(final ProcessContext context) {
+        // When going from the UI to Java, the characters are escaped so that 
what you
+        // input is transferred over to Java as is. So when you type the 
characters "\"
+        // and "n" into the UI the Java string will end up being those two 
characters
+        // not the interpreted value "\n".
+        final String msgDemarcator = 
context.getProperty(END_OF_LINE_CHARACTER).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
+        this.preference.set(new 
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
+                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0), 
msgDemarcator).build());
+    }
+
+    /**
+     * Method used to parse the string supplied by the user. The string is 
converted
+     * to a list of cell processors used to validate the CSV data.
+     * @param schema Schema to parse
+     */
+    private void parseSchema(String schema) {
+        List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
+
+        String remaining = schema;
+        while(remaining.length() > 0) {
+            remaining = setProcessor(remaining, processorsList);
+        }
+
+        this.processors.set(processorsList.toArray(new 
CellProcessor[processorsList.size()]));
+    }
+
+    private String setProcessor(String remaining, List<CellProcessor> 
processorsList) {
+        StringBuffer buffer = new StringBuffer();
+        int i = 0;
+        int opening = 0;
+        int closing = 0;
+        while(buffer.length() != remaining.length()) {
+            char c = remaining.charAt(i);
+            i++;
+
+            if(opening == 0 && c == ',') {
+                if(i == 1) {
+                    continue;
+                }
+                break;
+            }
+
+            buffer.append(c);
+
+            if(c == '(') {
+                opening++;
+            } else if(c == ')') {
+                closing++;
+            }
+
+            if(opening > 0 && opening == closing) {
+                break;
+            }
+        }
+
+        final String procString = buffer.toString().trim();
+        opening = procString.indexOf('(');
+        String method = procString;
+        String argument = null;
+        if(opening != -1) {
+            argument = method.substring(opening + 1, method.length() - 1);
+            method = method.substring(0, opening);
+        }
+
+        processorsList.add(getProcessor(method.toLowerCase(), argument));
+
+        return remaining.substring(i);
+    }
+
+    private CellProcessor getProcessor(String method, String argument) {
+        switch (method) {
+
+            case "optional":
+                int opening = argument.indexOf('(');
+                String subMethod = argument;
+                String subArgument = null;
+                if(opening != -1) {
+                    subArgument = subMethod.substring(opening + 1, 
subMethod.length() - 1);
+                    subMethod = subMethod.substring(0, opening);
+                }
+                return new Optional(getProcessor(subMethod.toLowerCase(), 
subArgument));
+
+            case "parsedate":
+                return new ParseDate(argument.substring(1, argument.length() - 
1));
+
+            case "parsedouble":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("ParseDouble does not 
expect any argument but has " + argument);
+                return new ParseDouble();
+
+            case "parsebigdecimal":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("ParseBigDecimal does 
not expect any argument but has " + argument);
+                return new ParseBigDecimal();
+
+            case "parsebool":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("ParseBool does not 
expect any argument but has " + argument);
+                return new ParseBool();
+
+            case "parsechar":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("ParseChar does not 
expect any argument but has " + argument);
+                return new ParseChar();
+
+            case "parseint":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("ParseInt does not 
expect any argument but has " + argument);
+                return new ParseInt();
+
+            case "parselong":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("ParseLong does not 
expect any argument but has " + argument);
+                return new ParseLong();
+
+            case "notnull":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("NotNull does not 
expect any argument but has " + argument);
+                return new NotNull();
+
+            case "strregex":
+                return new StrRegEx(argument.substring(1, argument.length() - 
1));
+
+            case "unique":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("Unique does not expect 
any argument but has " + argument);
+                return new Unique();
+
+            case "uniquehashcode":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("UniqueHashCode does 
not expect any argument but has " + argument);
+                return new UniqueHashCode();
+
+            case "strlen":
+                String[] splts = argument.split(",");
+                int[] requiredLengths = new int[splts.length];
+                for(int i = 0; i < splts.length; i++) {
+                    requiredLengths[i] = Integer.parseInt(splts[i]);
+                }
+                return new Strlen(requiredLengths);
+
+            case "strminmax":
+                String[] splits = argument.split(",");
+                return new StrMinMax(Long.parseLong(splits[0]), 
Long.parseLong(splits[1]));
+
+            case "lminmax":
+                String[] args = argument.split(",");
+                return new LMinMax(Long.parseLong(args[0]), 
Long.parseLong(args[1]));
+
+            case "dminmax":
+                String[] doubles = argument.split(",");
+                return new DMinMax(Double.parseDouble(doubles[0]), 
Double.parseDouble(doubles[1]));
+
+            case "equals":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("Equals does not expect 
any argument but has " + argument);
+                return new Equals();
+
+            case "forbidsubstr":
+                String[] forbiddenSubStrings = argument.replaceAll("\"", 
"").split(",[ ]*");
+                return new ForbidSubStr(forbiddenSubStrings);
+
+            case "requiresubstr":
+                String[] requiredSubStrings = argument.replaceAll("\"", 
"").split(",[ ]*");
+                return new RequireSubStr(requiredSubStrings);
+
+            case "strnotnullorempty":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("StrNotNullOrEmpty does 
not expect any argument but has " + argument);
+                return new StrNotNullOrEmpty();
+
+            case "requirehashcode":
+                String[] hashs = argument.split(",");
+                int[] hashcodes = new int[hashs.length];
+                for(int i = 0; i < hashs.length; i++) {
+                    hashcodes[i] = Integer.parseInt(hashs[i]);
+                }
+                return new RequireHashCode(hashcodes);
+
+            case "null":
+                if(argument != null && !argument.isEmpty())
+                    throw new IllegalArgumentException("Null does not expect 
any argument but has " + argument);
+                return null;
+
+            case "isincludedin":
+                String[] elements = argument.replaceAll("\"", "").split(",[ 
]*");
+                return new IsIncludedIn(elements);
+
+            default:
+                throw new IllegalArgumentException("[" + method + "] is not an 
allowed method to define a Cell Processor");
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final CsvPreference csvPref = this.preference.get();
+        final boolean header = context.getProperty(HEADER).asBoolean();
+        final ComponentLog logger = getLogger();
+        final CellProcessor[] cellProcs = this.processors.get();
+        final boolean isWholeFFValidation = 
context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
+
+        final AtomicReference<Boolean> valid = new 
AtomicReference<Boolean>(true);
+        final AtomicReference<Boolean> isFirstLineValid = new 
AtomicReference<Boolean>(true);
+        final AtomicReference<Boolean> isFirstLineInvalid = new 
AtomicReference<Boolean>(true);
+        final AtomicReference<Integer> okCount = new 
AtomicReference<Integer>(0);
+        final AtomicReference<Integer> totalCount = new 
AtomicReference<Integer>(0);
+        final AtomicReference<FlowFile> invalidFF = new 
AtomicReference<FlowFile>(null);
+        final AtomicReference<FlowFile> validFF = new 
AtomicReference<FlowFile>(null);
+
+        if(!isWholeFFValidation) {
+            invalidFF.set(session.create(flowFile));
+            validFF.set(session.create(flowFile));
+        }
+
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                NifiCsvListReader listReader = null;
+                try {
+                    listReader = new NifiCsvListReader(new 
InputStreamReader(in), csvPref);
+
+                    // handling of header
+                    if(header) {
+                        List<String> headerList = listReader.read();
+                        if(!isWholeFFValidation) {
+                            invalidFF.set(session.append(invalidFF.get(), new 
OutputStreamCallback() {
+                                @Override
+                                public void process(OutputStream out) throws 
IOException {
+                                    out.write(print(headerList, csvPref, 
isFirstLineInvalid.get()));
+                                }
+                            }));
+                            validFF.set(session.append(validFF.get(), new 
OutputStreamCallback() {
+                                @Override
+                                public void process(OutputStream out) throws 
IOException {
+                                    out.write(print(headerList, csvPref, 
isFirstLineValid.get()));
+                                }
+                            }));
+                            isFirstLineValid.set(false);
+                            isFirstLineInvalid.set(false);
+                        }
+                    }
+
+                    boolean stop = false;
+
+                    while (!stop) {
+                        try {
+
+                            final List<Object> list = 
listReader.read(cellProcs);
+                            stop = list == null;
+
+                            if(!isWholeFFValidation && !stop) {
+                                validFF.set(session.append(validFF.get(), new 
OutputStreamCallback() {
+                                    @Override
+                                    public void process(OutputStream out) 
throws IOException {
+                                        out.write(print(list, csvPref, 
isFirstLineValid.get()));
+                                    }
+                                }));
+                                okCount.set(okCount.get() + 1);
+
+                                if(isFirstLineValid.get()) {
+                                    isFirstLineValid.set(false);
+                                }
+                            }
+
+                        } catch (final SuperCsvException e) {
+                            valid.set(false);
+                            if(isWholeFFValidation) {
+                                logger.debug("Failed to validate {} against 
schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
+                                break;
+                            } else {
+                                // we append the invalid line to the flow file 
that will be routed to invalid relationship
+                                invalidFF.set(session.append(invalidFF.get(), 
new OutputStreamCallback() {
+                                    @Override
+                                    public void process(OutputStream out) 
throws IOException {
+                                        
out.write(print(e.getCsvContext().getRowSource(), csvPref, 
isFirstLineInvalid.get()));
+                                    }
+                                }));
+
+                                if(isFirstLineInvalid.get()) {
+                                    isFirstLineInvalid.set(false);
+                                }
+                            }
+                        } finally {
+                            if(!isWholeFFValidation) {
+                                totalCount.set(totalCount.get() + 1);
+                            }
+                        }
+                    }
+
+                } catch (final IOException e) {
+                    valid.set(false);
+                    logger.error("Failed to validate {} against schema due to 
{}", new Object[]{flowFile}, e);
+                } finally {
+                    if(listReader != null) {
+                        listReader.close();
+                    }
+                }
+            }
+        });
+
+        if(isWholeFFValidation) {
+            if (valid.get()) {
+                logger.debug("Successfully validated {} against schema; 
routing to 'valid'", new Object[]{flowFile});
+                session.getProvenanceReporter().route(flowFile, REL_VALID);
+                session.transfer(flowFile, REL_VALID);
+            } else {
+                session.getProvenanceReporter().route(flowFile, REL_INVALID);
+                session.transfer(flowFile, REL_INVALID);
+            }
+        } else {
+            if (valid.get()) {
+                logger.debug("Successfully validated {} against schema; 
routing to 'valid'", new Object[]{validFF.get()});
+                session.getProvenanceReporter().route(validFF.get(), 
REL_VALID, "All " + totalCount.get() + " line(s) are valid");
+                session.transfer(validFF.get(), REL_VALID);
+                session.remove(invalidFF.get());
+                session.remove(flowFile);
+            } else if (okCount.get() != 0) {
+                // because of the finally within the 'while' loop
+                totalCount.set(totalCount.get() - 1);
+
+                logger.debug("Successfully validated {}/{} line(s) in {} 
against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
+                        new Object[]{okCount.get(), totalCount.get(), 
flowFile});
+                session.getProvenanceReporter().route(validFF.get(), 
REL_VALID, okCount.get() + " valid line(s)");
+                session.transfer(validFF.get(), REL_VALID);
+                session.getProvenanceReporter().route(invalidFF.get(), 
REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
+                session.transfer(invalidFF.get(), REL_INVALID);
+                session.remove(flowFile);
+            } else {
+                logger.debug("All lines in {} are invalid; routing to 
'invalid'", new Object[]{invalidFF.get()});
+                session.getProvenanceReporter().route(invalidFF.get(), 
REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
+                session.transfer(invalidFF.get(), REL_INVALID);
+                session.remove(validFF.get());
+                session.remove(flowFile);
+            }
+        }
+    }
+
+    /**
+     * Method used to correctly write the lines by taking into account end of 
line
+     * character and separator character.
+     * @param list list of elements of the current row
+     * @param csvPref CSV preferences
+     * @param isFirstLine true if this is the first line we append
+     * @return String to append in the flow file
+     */
+    private byte[] print(List<?> list, CsvPreference csvPref, boolean 
isFirstLine) {
+        StringBuffer buffer = new StringBuffer();
+
+        if (!isFirstLine) {
+            buffer.append(csvPref.getEndOfLineSymbols());
+        }
+
+        final int size = list.size();
+        for(int i = 0; i < size; i++) {
+            buffer.append(list.get(i).toString());
+            if(i != size - 1) {
+                buffer.append((char) csvPref.getDelimiterChar());
+            }
+        }
+
+        return buffer.toString().getBytes();
+    }
+
+    /**
+     * This is required to avoid the side effect of Parse* cell processors. If 
not overriding
+     * this method, parsing will return objects and writing objects could 
result in a different
+     * output in comparison to the input.
+     */
+    private class NifiCsvListReader extends CsvListReader {
+
+        public NifiCsvListReader(Reader reader, CsvPreference preferences) {
+            super(reader, preferences);
+        }
+
+        @Override
+        public List<Object> read(CellProcessor... processors) throws 
IOException {
+            if( processors == null ) {
+                throw new NullPointerException("Processors should not be 
null");
+            }
+            if( readRow() ) {
+                super.executeProcessors(new 
ArrayList<Object>(getColumns().size()), processors);
+                return new ArrayList<Object>(getColumns());
+            }
+            return null; // EOF
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index c1f9a77..716e736 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -87,5 +87,6 @@ org.apache.nifi.processors.standard.TailFile
 org.apache.nifi.processors.standard.TransformXml
 org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.ValidateXml
+org.apache.nifi.processors.standard.ValidateCsv
 org.apache.nifi.processors.standard.ExecuteSQL
 org.apache.nifi.processors.standard.FetchDistributedMapCache
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ValidateCsv/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ValidateCsv/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ValidateCsv/additionalDetails.html
new file mode 100644
index 0000000..c708838
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ValidateCsv/additionalDetails.html
@@ -0,0 +1,109 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>ValidateCsv</title>
+    <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+<!-- Processor Documentation 
================================================== -->
+<h2>Usage Information</h2>
+
+<p>
+       The Validate CSV processor is based on the super-csv library and the 
concept of 
+       <a href="http://super-csv.github.io/super-csv/cell_processors.html"; 
target="_blank">Cell Processors</a>.
+       The corresponding java documentation can be found 
+       <a 
href="http://super-csv.github.io/super-csv/apidocs/org/supercsv/cellprocessor/ift/CellProcessor.html";
 target="_blank">here</a>.
+</p>
+
+<p>
+       The cell processors cannot be nested (except with Optional which gives 
the possibility to define a CellProcessor for values
+       that could be null) and must be defined in a comma-delimited string as 
the Schema property.
+</p>
+
+<p>
+       The supported cell processors are:
+       <ul>
+               <li>ParseBigDecimal</li>
+               <li>ParseBool</li>
+               <li>ParseChar</li>
+               <li>ParseDate</li>
+               <li>ParseDouble</li>
+               <li>ParseInt</li>
+               <li>Optional</li>
+               <li>DMinMax</li>
+               <li>Equals</li>
+               <li>ForbidSubStr</li>
+               <li>LMinMax</li>
+               <li>NotNull</li>
+               <li>Null</li>
+               <li>RequireHashCode</li>
+               <li>RequireSubStr</li>
+               <li>Strlen</li>
+               <li>StrMinMax</li>
+               <li>StrNotNullOrEmpty</li>
+               <li>StrRegEx</li>
+               <li>Unique</li>
+               <li>UniqueHashCode</li>
+               <li>IsIncludedIn</li>
+       </ul>
+</p>
+
+<p>
+       Here are some examples:
+       
+       <ul>
+       <b>Schema property:</b> Null, ParseDate("dd/MM/yyyy"), 
Optional(ParseDouble())<br />
+       <b>Meaning:</b> the input CSV has three columns, the first one can be 
null and has no specification, the second one must be a date
+       formatted as expected, and the third one must a double or null (no 
value).
+       </ul>
+       
+       <ul>
+       <b>Schema property:</b> ParseBigDecimal(), ParseBool(), ParseChar(), 
ParseInt(), ParseLong()<br />
+       <b>Meaning:</b> the input CSV has five columns, the first one must be a 
big decimal, the second one must be a boolean, 
+       the third one must be a char, the fourth one must be an integer and the 
fifth one must be a long.
+       </ul>
+       
+       <ul>
+       <b>Schema property:</b> Equals(), NotNull(), StrNotNullOrEmpty()<br />
+       <b>Meaning:</b> the input CSV has three columns, all the values of the 
first column must be equal to each other, all the values
+       of the second column must be not null, and all the values of the third 
column are not null/empty string values.
+       </ul>
+       
+       <ul>
+       <b>Schema property:</b> Strlen(4), StrMinMax(3,5), 
StrRegex("[a-z0-9\\._]+@[a-z0-9\\.]+")<br />
+       <b>Meaning:</b> the input CSV has three columns, all the values of the 
first column must be 4-characters long, all the values
+       of the second column must be between 3 and 5 characters (inclusive), 
and all the values of the last column must match
+       the provided regular expression (email address).
+       </ul>
+       
+       <ul>
+       <b>Schema property:</b> Unique(), UniqueHashCode()<br />
+       <b>Meaning:</b> the input CSV has two columns. All the values of the 
first column must be unique (all the values are stored in
+       memory and this can be consuming depending of the input). All the 
values of the second column must be unique (only hash
+       codes of the input values are stored to ensure uniqueness).
+       </ul>
+       
+       <ul>
+       <b>Schema property:</b> ForbidSubStr("test", "tset"), 
RequireSubStr("test")<br />
+       <b>Meaning:</b> the input CSV has two columns. None of the values in 
the first column must contain one of the provided strings.
+       And all the values of the second column must contain the provided 
string.
+       </ul>
+</p>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
new file mode 100644
index 0000000..d3c6493
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestValidateCsv {
+
+    @Test
+    public void testHeaderAndSplit() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "true");
+        runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, 
ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
+
+        runner.setProperty(ValidateCsv.SCHEMA, "Null, 
ParseDate(\"dd/MM/yyyy\"), Optional(ParseDouble())");
+
+        
runner.enqueue("Name,Birthdate,Weight\nJohn,22/11/1954,63.2\nBob,01/03/2004,45.0");
+        runner.run();
+
+        runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/11/1954,63.2\nBob,01/03/2004,45.0");
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
+
+        runner.clearTransferState();
+
+        
runner.enqueue("Name,Birthdate,Weight\nJohn,22/11/1954,63a2\nBob,01/032004,45.0");
+        runner.run();
+
+        runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/11/1954,63a2\nBob,01/032004,45.0");
+
+        runner.clearTransferState();
+
+        
runner.enqueue("Name,Birthdate,Weight\nJohn,22/111954,63.2\nBob,01/03/2004,45.0");
+        runner.run();
+
+        runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Name,Birthdate,Weight\nBob,01/03/2004,45.0");
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/111954,63.2");
+    }
+
+    @Test
+    public void testUniqueWithSplit() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+        runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, 
ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
+
+        runner.setProperty(ValidateCsv.SCHEMA, "Unique()");
+
+        runner.enqueue("John\r\nBob\r\nBob\r\nJohn");
+        runner.run();
+
+        runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob");
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Bob\r\nJohn");
+    }
+
+    @Test
+    public void testValidDateOptionalDouble() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+
+        runner.setProperty(ValidateCsv.SCHEMA, "Null, 
ParseDate(\"dd/MM/yyyy\"), Optional(ParseDouble())");
+
+        runner.enqueue("John,22/11/1954,63.2\r\nBob,01/03/2004,45.0");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
+        runner.run();
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testIsIncludedIn() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+
+        runner.setProperty(ValidateCsv.SCHEMA, "Null, 
ParseDate(\"dd/MM/yyyy\"), IsIncludedIn(\"male\", \"female\")");
+
+        runner.enqueue("John,22/11/1954,male\r\nMarie,01/03/2004,female");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
+        runner.run();
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testBigDecimalBoolCharIntLong() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "true");
+
+        runner.setProperty(ValidateCsv.SCHEMA, "ParseBigDecimal(), 
ParseBool(), ParseChar(), ParseInt(), ParseLong()");
+
+        
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,92147483647,92147483647");
+        runner.run();
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testEqualsNotNullStrNotNullOrEmpty() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+
+        runner.setProperty(ValidateCsv.SCHEMA, "Equals(), NotNull(), 
StrNotNullOrEmpty()");
+
+        runner.enqueue("test,test,test\r\ntest,test,test");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        runner.enqueue("test,test,test\r\ntset,test,test");
+        runner.run();
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testStrlenStrMinMaxStrRegex() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+
+        runner.setProperty(ValidateCsv.SCHEMA, "Strlen(4), StrMinMax(3,5), 
StrRegex(\"[a-z0-9\\._]+@[a-z0-9\\.]+\")");
+
+        runner.enqueue("test,test,t...@apache.org");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        runner.enqueue("test,test,testapache.org");
+        runner.run();
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testDMinMaxForbidSubStrLMinMax() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+
+        runner.setProperty(ValidateCsv.SCHEMA, 
"DMinMax(10,100),LMinMax(10,100),ForbidSubStr(\"test\", \"tset\")");
+
+        runner.enqueue("50.001,50,hello");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        runner.enqueue("10,10,testapache.org");
+        runner.run();
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testUnique() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+
+        runner.setProperty(ValidateCsv.SCHEMA, "Unique(), UniqueHashCode()");
+
+        runner.enqueue("1,2\r\n3,4");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        runner.enqueue("1,2\r\n1,4");
+        runner.run();
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testRequire() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+
+        int hashcode = "test".hashCode();
+        runner.setProperty(ValidateCsv.SCHEMA, "RequireHashCode(" + hashcode + 
"), RequireSubStr(\"test\")");
+
+        runner.enqueue("test,test");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        runner.enqueue("tset,tset");
+        runner.run();
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testValidate() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+
+        runner.setProperty(ValidateCsv.SCHEMA, "RequireSubString(\"test\")");
+        runner.assertNotValid();
+
+        runner.setProperty(ValidateCsv.SCHEMA, "''");
+        runner.assertNotValid();
+
+        runner.setProperty(ValidateCsv.SCHEMA, "\"\"");
+        runner.assertNotValid();
+    }
+
+}

Reply via email to