Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70718795
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
 ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a 
user-specified CSV schema")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = 
Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +        "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", 
"Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +        "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", 
"StrNotNullOrEmpty", "StrRegEx", "Unique",
    +        "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is 
expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell 
processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors 
cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header 
to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. 
Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming 
data. Example: ;")
    +            .required(true)
    +            .defaultValue(";")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming 
data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated 
against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new 
Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the 
specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<List<CellProcessor>> processors = new 
AtomicReference<List<CellProcessor>>();
    +    private final AtomicReference<CsvPreference> preference = new 
AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            
this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new 
ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + 
e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new 
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                
context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                
context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is 
converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        this.processors.set(new ArrayList<CellProcessor>());
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining);
    +        }
    +    }
    +
    +    private String setProcessor(String remaining) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        this.processors.get().add(getProcessor(method.toLowerCase(), 
argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +        case "optional":
    --- End diff --
    
    nit pick: the cases should be indented a level from the switch statement


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to