This is an automated email from the ASF dual-hosted git repository.

mosermw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0190374e56 NIFI-12674 Modified ValidateCSV to make the schema optional 
if a header is provided. Added validate on attribute option.
0190374e56 is described below

commit 0190374e56541032b00401b529d78ede53c025f8
Author: Freedom9339 <[email protected]>
AuthorDate: Tue Feb 6 14:30:55 2024 +0000

    NIFI-12674 Modified ValidateCSV to make the schema optional if a header is 
provided. Added validate on attribute option.
    
    This closes #8362
    Signed-off-by: Mike Moser <[email protected]>
---
 .../nifi/processors/standard/ValidateCsv.java      | 270 +++++++++++----------
 .../nifi/processors/standard/TestValidateCsv.java  | 104 ++++++++
 2 files changed, 247 insertions(+), 127 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
index fdcfac3ead..8077905357 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -36,8 +36,6 @@ import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.supercsv.cellprocessor.Optional;
 import org.supercsv.cellprocessor.ParseBigDecimal;
@@ -67,14 +65,16 @@ import org.supercsv.io.CsvListReader;
 import org.supercsv.prefs.CsvPreference;
 import org.supercsv.util.CsvContext;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStream;
 import java.io.Reader;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -82,7 +82,7 @@ import java.util.concurrent.atomic.AtomicReference;
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"csv", "schema", "validation"})
-@CapabilityDescription("Validates the contents of FlowFiles against a 
user-specified CSV schema. " +
+@CapabilityDescription("Validates the contents of FlowFiles or a FlowFile 
attribute value against a user-specified CSV schema. " +
         "Take a look at the additional documentation of this processor for 
some schema examples.")
 @WritesAttributes({
     @WritesAttribute(attribute = "count.valid.lines", description = "If line 
by line validation, number of valid lines extracted from the source data"),
@@ -116,8 +116,8 @@ public class ValidateCsv extends AbstractProcessor {
             .displayName("Schema")
             .description("The schema to be used for validation. Is expected a 
comma-delimited string representing the cell "
                     + "processors to apply. The following cell processors are 
allowed in the schema definition: "
-                    + ALLOWED_OPERATORS + ". Note: cell processors cannot be 
nested except with Optional.")
-            .required(true)
+                    + ALLOWED_OPERATORS + ". Note: cell processors cannot be 
nested except with Optional. Schema is required if Header is false.")
+            .required(false)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
@@ -172,6 +172,16 @@ public class ValidateCsv extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor CSV_SOURCE_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("CSV Source Attribute")
+            .displayName("CSV Source Attribute")
+            .description("The name of the attribute containing CSV data to be 
validated. If this property is blank, the FlowFile content will be validated.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .dependsOn(VALIDATION_STRATEGY, VALIDATE_WHOLE_FLOWFILE.getValue())
+            .build();
+
     public static final PropertyDescriptor INCLUDE_ALL_VIOLATIONS = new 
PropertyDescriptor.Builder()
             .name("validate-csv-violations")
             .displayName("Include all violations")
@@ -187,6 +197,7 @@ public class ValidateCsv extends AbstractProcessor {
 
     private static final List<PropertyDescriptor> PROPERTIES = List.of(
             SCHEMA,
+            CSV_SOURCE_ATTRIBUTE,
             HEADER,
             DELIMITER_CHARACTER,
             QUOTE_CHARACTER,
@@ -201,7 +212,8 @@ public class ValidateCsv extends AbstractProcessor {
             .build();
     public static final Relationship REL_INVALID = new Relationship.Builder()
             .name("invalid")
-            .description("FlowFiles that are not valid according to the 
specified schema are routed to this relationship")
+            .description("FlowFiles that are not valid according to the 
specified schema,"
+                    + " or no schema or CSV header can be identified, are 
routed to this relationship")
             .build();
 
     private static final Set<Relationship> RELATIONSHIPS = Set.of(
@@ -223,6 +235,7 @@ public class ValidateCsv extends AbstractProcessor {
     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
 
         PropertyValue schemaProp = context.getProperty(SCHEMA);
+        PropertyValue headerProp = context.getProperty(HEADER);
         String schema = schemaProp.getValue();
         String subject = SCHEMA.getName();
 
@@ -231,7 +244,11 @@ public class ValidateCsv extends AbstractProcessor {
         }
         // If no Expression Language is present, try parsing the schema
         try {
-            this.parseSchema(schema);
+            if (schema != null) {
+                this.parseSchema(schema);
+            } else if (!headerProp.asBoolean()) {
+                throw(new Exception("Schema cannot be empty if Header property 
is false."));
+            }
         } catch (Exception e) {
             final List<ValidationResult> problems = new ArrayList<>(1);
             problems.add(new ValidationResult.Builder().subject(subject)
@@ -449,155 +466,154 @@ public class ValidateCsv extends AbstractProcessor {
         final CsvPreference csvPref = getPreference(context, flowFile);
         final boolean header = context.getProperty(HEADER).asBoolean();
         final ComponentLog logger = getLogger();
-        final String schema = 
context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
-        final CellProcessor[] cellProcs = this.parseSchema(schema);
-        final boolean isWholeFFValidation = 
context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
+        String schema = 
context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
+        CellProcessor[] cellProcs = null;
+        if (schema != null) {
+            cellProcs = this.parseSchema(schema);
+        }
+        final String validationStrategy = 
context.getProperty(VALIDATION_STRATEGY).getValue();
+        final boolean isWholeFFValidation = 
!validationStrategy.equals(VALIDATE_LINES_INDIVIDUALLY.getValue());
         final boolean includeAllViolations = 
context.getProperty(INCLUDE_ALL_VIOLATIONS).asBoolean();
 
-        final AtomicReference<Boolean> valid = new AtomicReference<>(true);
+        boolean valid = true;
+        int okCount = 0;
+        int totalCount = 0;
+        FlowFile invalidFF = null;
+        FlowFile validFF = null;
+        String validationError = null;
         final AtomicReference<Boolean> isFirstLineValid = new 
AtomicReference<>(true);
         final AtomicReference<Boolean> isFirstLineInvalid = new 
AtomicReference<>(true);
-        final AtomicReference<Integer> okCount = new AtomicReference<>(0);
-        final AtomicReference<Integer> totalCount = new AtomicReference<>(0);
-        final AtomicReference<FlowFile> invalidFF = new 
AtomicReference<>(null);
-        final AtomicReference<FlowFile> validFF = new AtomicReference<>(null);
-        final AtomicReference<String> validationError = new 
AtomicReference<>(null);
 
         if (!isWholeFFValidation) {
-            invalidFF.set(session.create(flowFile));
-            validFF.set(session.create(flowFile));
+            invalidFF = session.create(flowFile);
+            validFF = session.create(flowFile);
+        }
+
+        InputStream stream;
+        if (context.getProperty(CSV_SOURCE_ATTRIBUTE).isSet() && 
isWholeFFValidation) {
+            String csvAttribute = 
flowFile.getAttribute(context.getProperty(CSV_SOURCE_ATTRIBUTE).evaluateAttributeExpressions().getValue());
+            stream = new 
ByteArrayInputStream(Objects.requireNonNullElse(csvAttribute, 
"").getBytes(StandardCharsets.UTF_8));
+        } else {
+            stream = session.read(flowFile);
         }
 
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                try (final NifiCsvListReader listReader = new 
NifiCsvListReader(new InputStreamReader(in), csvPref)) {
-
-                    // handling of header
-                    if (header) {
-
-                        // read header
-                        listReader.read();
-
-                        if (!isWholeFFValidation) {
-                            invalidFF.set(session.append(invalidFF.get(), new 
OutputStreamCallback() {
-                                @Override
-                                public void process(OutputStream out) throws 
IOException {
-                                    
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
-                                }
-                            }));
-                            validFF.set(session.append(validFF.get(), new 
OutputStreamCallback() {
-                                @Override
-                                public void process(OutputStream out) throws 
IOException {
-                                    
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
-                                }
-                            }));
+        stream: try (final NifiCsvListReader listReader = new 
NifiCsvListReader(new InputStreamReader(stream), csvPref)) {
+
+            // handling of header
+            if (header) {
+
+                // read header
+                List<String> headers = listReader.read();
+
+                if (schema == null) {
+                    if (headers != null && !headers.isEmpty()) {
+                        String newSchema = 
"Optional(StrNotNullOrEmpty()),".repeat(headers.size());
+                        schema = newSchema.substring(0, newSchema.length() - 
1);
+                        cellProcs = this.parseSchema(schema);
+                    } else {
+                        validationError = "No schema or CSV header could be 
identified.";
+                        valid = false;
+                        break stream;
+                    }
+                }
+
+                if (!isWholeFFValidation) {
+                    invalidFF = session.append(invalidFF, out -> 
out.write(print(listReader.getUntokenizedRow(), csvPref, true)));
+                    validFF = session.append(validFF, out -> 
out.write(print(listReader.getUntokenizedRow(), csvPref, true)));
+                    isFirstLineValid.set(false);
+                    isFirstLineInvalid.set(false);
+                }
+            }
+
+            boolean stop = false;
+
+            while (!stop) {
+                try {
+
+                    // read next row and check if no more row
+                    stop = listReader.read(includeAllViolations && valid, 
cellProcs) == null;
+
+                    if (!isWholeFFValidation && !stop) {
+                        validFF = session.append(validFF, out -> 
out.write(print(listReader.getUntokenizedRow(), csvPref, 
isFirstLineValid.get())));
+                        okCount++;
+
+                        if (isFirstLineValid.get()) {
                             isFirstLineValid.set(false);
-                            isFirstLineInvalid.set(false);
                         }
                     }
+                } catch (final SuperCsvException e) {
+                    valid = false;
+                    if (isWholeFFValidation) {
+                        validationError = e.getLocalizedMessage();
+                        logger.debug("Failed to validate {} against schema due 
to {}; routing to 'invalid'", flowFile, e);
+                        break;
+                    } else {
+                        // we append the invalid line to the flow file that 
will be routed to invalid relationship
+                        invalidFF = session.append(invalidFF, out -> 
out.write(print(listReader.getUntokenizedRow(), csvPref, 
isFirstLineInvalid.get())));
 
-                    boolean stop = false;
-
-                    while (!stop) {
-                        try {
-
-                            // read next row and check if no more row
-                            stop = listReader.read(includeAllViolations && 
valid.get(), cellProcs) == null;
-
-                            if (!isWholeFFValidation && !stop) {
-                                validFF.set(session.append(validFF.get(), new 
OutputStreamCallback() {
-                                    @Override
-                                    public void process(OutputStream out) 
throws IOException {
-                                        
out.write(print(listReader.getUntokenizedRow(), csvPref, 
isFirstLineValid.get()));
-                                    }
-                                }));
-                                okCount.set(okCount.get() + 1);
-
-                                if (isFirstLineValid.get()) {
-                                    isFirstLineValid.set(false);
-                                }
-                            }
-
-                        } catch (final SuperCsvException e) {
-                            valid.set(false);
-                            if (isWholeFFValidation) {
-                                validationError.set(e.getLocalizedMessage());
-                                logger.debug("Failed to validate {} against 
schema due to {}; routing to 'invalid'", flowFile, e);
-                                break;
-                            } else {
-                                // we append the invalid line to the flow file 
that will be routed to invalid relationship
-                                invalidFF.set(session.append(invalidFF.get(), 
new OutputStreamCallback() {
-                                    @Override
-                                    public void process(OutputStream out) 
throws IOException {
-                                        
out.write(print(listReader.getUntokenizedRow(), csvPref, 
isFirstLineInvalid.get()));
-                                    }
-                                }));
-
-                                if (isFirstLineInvalid.get()) {
-                                    isFirstLineInvalid.set(false);
-                                }
-
-                                if (validationError.get() == null) {
-                                    
validationError.set(e.getLocalizedMessage());
-                                }
-                            }
-                        } finally {
-                            if (!isWholeFFValidation) {
-                                totalCount.set(totalCount.get() + 1);
-                            }
+                        if (isFirstLineInvalid.get()) {
+                            isFirstLineInvalid.set(false);
                         }
-                    }
 
-                } catch (final IOException e) {
-                    valid.set(false);
-                    logger.error("Failed to validate {} against schema due to 
{}", flowFile, e);
+                        if (validationError == null) {
+                            validationError = e.getLocalizedMessage();
+                        }
+                    }
+                } finally {
+                    if (!isWholeFFValidation) {
+                        totalCount++;
+                    }
                 }
             }
-        });
+
+        } catch (final IOException e) {
+            valid = false;
+            logger.error("Failed to validate {} against schema due to {}", 
flowFile, e);
+        }
 
         if (isWholeFFValidation) {
-            if (valid.get()) {
+            if (valid) {
                 logger.debug("Successfully validated {} against schema; 
routing to 'valid'", flowFile);
                 session.getProvenanceReporter().route(flowFile, REL_VALID);
                 session.transfer(flowFile, REL_VALID);
             } else {
                 session.getProvenanceReporter().route(flowFile, REL_INVALID);
-                session.putAttribute(flowFile, "validation.error.message", 
validationError.get());
+                session.putAttribute(flowFile, "validation.error.message", 
validationError);
                 session.transfer(flowFile, REL_INVALID);
             }
         } else {
-            if (valid.get()) {
-                logger.debug("Successfully validated {} against schema; 
routing to 'valid'", validFF.get());
-                session.getProvenanceReporter().route(validFF.get(), 
REL_VALID, "All " + totalCount.get() + " line(s) are valid");
-                session.putAttribute(validFF.get(), "count.valid.lines", 
Integer.toString(totalCount.get()));
-                session.putAttribute(validFF.get(), "count.total.lines", 
Integer.toString(totalCount.get()));
-                session.transfer(validFF.get(), REL_VALID);
-                session.remove(invalidFF.get());
+            if (valid) {
+                logger.debug("Successfully validated {} against schema; 
routing to 'valid'", validFF);
+                session.getProvenanceReporter().route(validFF, REL_VALID, "All 
" + totalCount + " line(s) are valid");
+                session.putAttribute(validFF, "count.valid.lines", 
Integer.toString(totalCount));
+                session.putAttribute(validFF, "count.total.lines", 
Integer.toString(totalCount));
+                session.transfer(validFF, REL_VALID);
+                session.remove(invalidFF);
                 session.remove(flowFile);
-            } else if (okCount.get() != 0) {
+            } else if (okCount != 0) {
                 // because of the finally within the 'while' loop
-                totalCount.set(totalCount.get() - 1);
-
-                logger.debug("Successfully validated {}/{} line(s) in {} 
against schema; routing valid lines to 'valid' and invalid lines to 'invalid'", 
okCount.get(), totalCount.get(), flowFile);
-                session.getProvenanceReporter().route(validFF.get(), 
REL_VALID, okCount.get() + " valid line(s)");
-                session.putAttribute(validFF.get(), "count.total.lines", 
Integer.toString(totalCount.get()));
-                session.putAttribute(validFF.get(), "count.valid.lines", 
Integer.toString(okCount.get()));
-                session.transfer(validFF.get(), REL_VALID);
-                session.getProvenanceReporter().route(invalidFF.get(), 
REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
-                session.putAttribute(invalidFF.get(), "count.invalid.lines", 
Integer.toString((totalCount.get() - okCount.get())));
-                session.putAttribute(invalidFF.get(), "count.total.lines", 
Integer.toString(totalCount.get()));
-                session.putAttribute(invalidFF.get(), 
"validation.error.message", validationError.get());
-                session.transfer(invalidFF.get(), REL_INVALID);
+                totalCount--;
+
+                logger.debug("Successfully validated {}/{} line(s) in {} 
against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
+                        okCount, totalCount, flowFile);
+                session.getProvenanceReporter().route(validFF, REL_VALID, 
okCount + " valid line(s)");
+                session.putAttribute(validFF, "count.total.lines", 
Integer.toString(totalCount));
+                session.putAttribute(validFF, "count.valid.lines", 
Integer.toString(okCount));
+                session.transfer(validFF, REL_VALID);
+                session.getProvenanceReporter().route(invalidFF, REL_INVALID, 
(totalCount - okCount) + " invalid line(s)");
+                session.putAttribute(invalidFF, "count.invalid.lines", 
Integer.toString((totalCount - okCount)));
+                session.putAttribute(invalidFF, "count.total.lines", 
Integer.toString(totalCount));
+                session.putAttribute(invalidFF, "validation.error.message", 
validationError);
+                session.transfer(invalidFF, REL_INVALID);
                 session.remove(flowFile);
             } else {
-                logger.debug("All lines in {} are invalid; routing to 
'invalid'", invalidFF.get());
-                session.getProvenanceReporter().route(invalidFF.get(), 
REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
-                session.putAttribute(invalidFF.get(), "count.invalid.lines", 
Integer.toString(totalCount.get()));
-                session.putAttribute(invalidFF.get(), "count.total.lines", 
Integer.toString(totalCount.get()));
-                session.putAttribute(invalidFF.get(), 
"validation.error.message", validationError.get());
-                session.transfer(invalidFF.get(), REL_INVALID);
-                session.remove(validFF.get());
+                logger.debug("All lines in {} are invalid; routing to 
'invalid'", invalidFF);
+                session.getProvenanceReporter().route(invalidFF, REL_INVALID, 
"All " + totalCount + " line(s) are invalid");
+                session.putAttribute(invalidFF, "count.invalid.lines", 
Integer.toString(totalCount));
+                session.putAttribute(invalidFF, "count.total.lines", 
Integer.toString(totalCount));
+                session.putAttribute(invalidFF, "validation.error.message", 
validationError);
+                session.transfer(invalidFF, REL_INVALID);
+                session.remove(validFF);
                 session.remove(flowFile);
             }
         }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
index 2423db913d..c518bdfd29 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -16,10 +16,14 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class TestValidateCsv {
 
     @Test
@@ -164,6 +168,106 @@ public class TestValidateCsv {
         runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
     }
 
+    @Test
+    public void testNoSchema() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "true");
+
+        
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+        runner.clearTransferState();
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_INVALID, 1);
+    }
+
+    @Test
+    public void testValidateOnAttribute() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "true");
+        runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
+        runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, 
ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
+        final Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("CSV_ATTRIBUTE", 
"bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+
+        runner.enqueue("FlowFile Random Data", attributeMap);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile
 Random Data");
+    }
+
+    @Test
+    public void testValidateOnAttributeDoesNotExist() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "true");
+        runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
+        runner.setProperty(ValidateCsv.SCHEMA, 
"ParseInt(),ParseInt(),ParseInt()");
+        runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, 
ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
+        final Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("CSV_ATTRIBUTE_BAD", 
"bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+
+        runner.enqueue("FlowFile Random Data", attributeMap);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile
 Random Data");
+
+        runner.clearTransferState();
+        attributeMap.clear();
+        attributeMap.put("CSV_ATTRIBUTE", "");
+        runner.enqueue("FlowFile Random Data", attributeMap);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+        
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile
 Random Data");
+    }
+
+    @Test
+    public void testValidateOnAttributeDoesNotExistNoSchema() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "true");
+        runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
+        runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, 
ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
+        final Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("CSV_ATTRIBUTE_BAD", 
"bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+
+        runner.enqueue("FlowFile Random Data", attributeMap);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_INVALID, 1);
+        MockFlowFile flowfile = 
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).getFirst();
+        flowfile.assertAttributeEquals("validation.error.message",
+                "No schema or CSV header could be identified.");
+        flowfile.assertContentEquals("FlowFile Random Data");
+    }
+
+    @Test
+    public void testValidateEmptyFile() {
+        final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.HEADER, "true");
+        runner.setProperty(ValidateCsv.SCHEMA, 
"ParseInt(),ParseInt(),ParseInt()");
+        runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, 
ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
+        final Map<String, String> attributeMap = new HashMap<>();
+
+        runner.enqueue(new byte[0], attributeMap);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+    }
+
     @Test
     public void testEqualsNotNullStrNotNullOrEmpty() {
         final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());

Reply via email to