This is an automated email from the ASF dual-hosted git repository.
mosermw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0190374e56 NIFI-12674 Modified ValidateCSV to make the schema optional
if a header is provided. Added validate on attribute option.
0190374e56 is described below
commit 0190374e56541032b00401b529d78ede53c025f8
Author: Freedom9339 <[email protected]>
AuthorDate: Tue Feb 6 14:30:55 2024 +0000
NIFI-12674 Modified ValidateCSV to make the schema optional if a header is
provided. Added validate on attribute option.
This closes #8362
Signed-off-by: Mike Moser <[email protected]>
---
.../nifi/processors/standard/ValidateCsv.java | 270 +++++++++++----------
.../nifi/processors/standard/TestValidateCsv.java | 104 ++++++++
2 files changed, 247 insertions(+), 127 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
index fdcfac3ead..8077905357 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -36,8 +36,6 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseBigDecimal;
@@ -67,14 +65,16 @@ import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;
import org.supercsv.util.CsvContext;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStream;
import java.io.Reader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@@ -82,7 +82,7 @@ import java.util.concurrent.atomic.AtomicReference;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"csv", "schema", "validation"})
-@CapabilityDescription("Validates the contents of FlowFiles against a
user-specified CSV schema. " +
+@CapabilityDescription("Validates the contents of FlowFiles or a FlowFile
attribute value against a user-specified CSV schema. " +
"Take a look at the additional documentation of this processor for
some schema examples.")
@WritesAttributes({
@WritesAttribute(attribute = "count.valid.lines", description = "If line
by line validation, number of valid lines extracted from the source data"),
@@ -116,8 +116,8 @@ public class ValidateCsv extends AbstractProcessor {
.displayName("Schema")
.description("The schema to be used for validation. Is expected a
comma-delimited string representing the cell "
+ "processors to apply. The following cell processors are
allowed in the schema definition: "
- + ALLOWED_OPERATORS + ". Note: cell processors cannot be
nested except with Optional.")
- .required(true)
+ + ALLOWED_OPERATORS + ". Note: cell processors cannot be
nested except with Optional. Schema is required if Header is false.")
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
@@ -172,6 +172,16 @@ public class ValidateCsv extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor CSV_SOURCE_ATTRIBUTE = new
PropertyDescriptor.Builder()
+ .name("CSV Source Attribute")
+ .displayName("CSV Source Attribute")
+ .description("The name of the attribute containing CSV data to be
validated. If this property is blank, the FlowFile content will be validated.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+ .dependsOn(VALIDATION_STRATEGY, VALIDATE_WHOLE_FLOWFILE.getValue())
+ .build();
+
public static final PropertyDescriptor INCLUDE_ALL_VIOLATIONS = new
PropertyDescriptor.Builder()
.name("validate-csv-violations")
.displayName("Include all violations")
@@ -187,6 +197,7 @@ public class ValidateCsv extends AbstractProcessor {
private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA,
+ CSV_SOURCE_ATTRIBUTE,
HEADER,
DELIMITER_CHARACTER,
QUOTE_CHARACTER,
@@ -201,7 +212,8 @@ public class ValidateCsv extends AbstractProcessor {
.build();
public static final Relationship REL_INVALID = new Relationship.Builder()
.name("invalid")
- .description("FlowFiles that are not valid according to the
specified schema are routed to this relationship")
+ .description("FlowFiles that are not valid according to the
specified schema,"
+ + " or no schema or CSV header can be identified, are
routed to this relationship")
.build();
private static final Set<Relationship> RELATIONSHIPS = Set.of(
@@ -223,6 +235,7 @@ public class ValidateCsv extends AbstractProcessor {
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
PropertyValue schemaProp = context.getProperty(SCHEMA);
+ PropertyValue headerProp = context.getProperty(HEADER);
String schema = schemaProp.getValue();
String subject = SCHEMA.getName();
@@ -231,7 +244,11 @@ public class ValidateCsv extends AbstractProcessor {
}
// If no Expression Language is present, try parsing the schema
try {
- this.parseSchema(schema);
+ if (schema != null) {
+ this.parseSchema(schema);
+ } else if (!headerProp.asBoolean()) {
+ throw(new Exception("Schema cannot be empty if Header property
is false."));
+ }
} catch (Exception e) {
final List<ValidationResult> problems = new ArrayList<>(1);
problems.add(new ValidationResult.Builder().subject(subject)
@@ -449,155 +466,154 @@ public class ValidateCsv extends AbstractProcessor {
final CsvPreference csvPref = getPreference(context, flowFile);
final boolean header = context.getProperty(HEADER).asBoolean();
final ComponentLog logger = getLogger();
- final String schema =
context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
- final CellProcessor[] cellProcs = this.parseSchema(schema);
- final boolean isWholeFFValidation =
context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
+ String schema =
context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
+ CellProcessor[] cellProcs = null;
+ if (schema != null) {
+ cellProcs = this.parseSchema(schema);
+ }
+ final String validationStrategy =
context.getProperty(VALIDATION_STRATEGY).getValue();
+ final boolean isWholeFFValidation =
!validationStrategy.equals(VALIDATE_LINES_INDIVIDUALLY.getValue());
final boolean includeAllViolations =
context.getProperty(INCLUDE_ALL_VIOLATIONS).asBoolean();
- final AtomicReference<Boolean> valid = new AtomicReference<>(true);
+ boolean valid = true;
+ int okCount = 0;
+ int totalCount = 0;
+ FlowFile invalidFF = null;
+ FlowFile validFF = null;
+ String validationError = null;
final AtomicReference<Boolean> isFirstLineValid = new
AtomicReference<>(true);
final AtomicReference<Boolean> isFirstLineInvalid = new
AtomicReference<>(true);
- final AtomicReference<Integer> okCount = new AtomicReference<>(0);
- final AtomicReference<Integer> totalCount = new AtomicReference<>(0);
- final AtomicReference<FlowFile> invalidFF = new
AtomicReference<>(null);
- final AtomicReference<FlowFile> validFF = new AtomicReference<>(null);
- final AtomicReference<String> validationError = new
AtomicReference<>(null);
if (!isWholeFFValidation) {
- invalidFF.set(session.create(flowFile));
- validFF.set(session.create(flowFile));
+ invalidFF = session.create(flowFile);
+ validFF = session.create(flowFile);
+ }
+
+ InputStream stream;
+ if (context.getProperty(CSV_SOURCE_ATTRIBUTE).isSet() &&
isWholeFFValidation) {
+ String csvAttribute =
flowFile.getAttribute(context.getProperty(CSV_SOURCE_ATTRIBUTE).evaluateAttributeExpressions().getValue());
+ stream = new
ByteArrayInputStream(Objects.requireNonNullElse(csvAttribute,
"").getBytes(StandardCharsets.UTF_8));
+ } else {
+ stream = session.read(flowFile);
}
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- try (final NifiCsvListReader listReader = new
NifiCsvListReader(new InputStreamReader(in), csvPref)) {
-
- // handling of header
- if (header) {
-
- // read header
- listReader.read();
-
- if (!isWholeFFValidation) {
- invalidFF.set(session.append(invalidFF.get(), new
OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws
IOException {
-
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
- }
- }));
- validFF.set(session.append(validFF.get(), new
OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws
IOException {
-
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
- }
- }));
+ stream: try (final NifiCsvListReader listReader = new
NifiCsvListReader(new InputStreamReader(stream), csvPref)) {
+
+ // handling of header
+ if (header) {
+
+ // read header
+ List<String> headers = listReader.read();
+
+ if (schema == null) {
+ if (headers != null && !headers.isEmpty()) {
+ String newSchema =
"Optional(StrNotNullOrEmpty()),".repeat(headers.size());
+ schema = newSchema.substring(0, newSchema.length() -
1);
+ cellProcs = this.parseSchema(schema);
+ } else {
+ validationError = "No schema or CSV header could be
identified.";
+ valid = false;
+ break stream;
+ }
+ }
+
+ if (!isWholeFFValidation) {
+ invalidFF = session.append(invalidFF, out ->
out.write(print(listReader.getUntokenizedRow(), csvPref, true)));
+ validFF = session.append(validFF, out ->
out.write(print(listReader.getUntokenizedRow(), csvPref, true)));
+ isFirstLineValid.set(false);
+ isFirstLineInvalid.set(false);
+ }
+ }
+
+ boolean stop = false;
+
+ while (!stop) {
+ try {
+
+ // read next row and check if no more row
+ stop = listReader.read(includeAllViolations && valid,
cellProcs) == null;
+
+ if (!isWholeFFValidation && !stop) {
+ validFF = session.append(validFF, out ->
out.write(print(listReader.getUntokenizedRow(), csvPref,
isFirstLineValid.get())));
+ okCount++;
+
+ if (isFirstLineValid.get()) {
isFirstLineValid.set(false);
- isFirstLineInvalid.set(false);
}
}
+ } catch (final SuperCsvException e) {
+ valid = false;
+ if (isWholeFFValidation) {
+ validationError = e.getLocalizedMessage();
+ logger.debug("Failed to validate {} against schema due
to {}; routing to 'invalid'", flowFile, e);
+ break;
+ } else {
+ // we append the invalid line to the flow file that
will be routed to invalid relationship
+ invalidFF = session.append(invalidFF, out ->
out.write(print(listReader.getUntokenizedRow(), csvPref,
isFirstLineInvalid.get())));
- boolean stop = false;
-
- while (!stop) {
- try {
-
- // read next row and check if no more row
- stop = listReader.read(includeAllViolations &&
valid.get(), cellProcs) == null;
-
- if (!isWholeFFValidation && !stop) {
- validFF.set(session.append(validFF.get(), new
OutputStreamCallback() {
- @Override
- public void process(OutputStream out)
throws IOException {
-
out.write(print(listReader.getUntokenizedRow(), csvPref,
isFirstLineValid.get()));
- }
- }));
- okCount.set(okCount.get() + 1);
-
- if (isFirstLineValid.get()) {
- isFirstLineValid.set(false);
- }
- }
-
- } catch (final SuperCsvException e) {
- valid.set(false);
- if (isWholeFFValidation) {
- validationError.set(e.getLocalizedMessage());
- logger.debug("Failed to validate {} against
schema due to {}; routing to 'invalid'", flowFile, e);
- break;
- } else {
- // we append the invalid line to the flow file
that will be routed to invalid relationship
- invalidFF.set(session.append(invalidFF.get(),
new OutputStreamCallback() {
- @Override
- public void process(OutputStream out)
throws IOException {
-
out.write(print(listReader.getUntokenizedRow(), csvPref,
isFirstLineInvalid.get()));
- }
- }));
-
- if (isFirstLineInvalid.get()) {
- isFirstLineInvalid.set(false);
- }
-
- if (validationError.get() == null) {
-
validationError.set(e.getLocalizedMessage());
- }
- }
- } finally {
- if (!isWholeFFValidation) {
- totalCount.set(totalCount.get() + 1);
- }
+ if (isFirstLineInvalid.get()) {
+ isFirstLineInvalid.set(false);
}
- }
- } catch (final IOException e) {
- valid.set(false);
- logger.error("Failed to validate {} against schema due to
{}", flowFile, e);
+ if (validationError == null) {
+ validationError = e.getLocalizedMessage();
+ }
+ }
+ } finally {
+ if (!isWholeFFValidation) {
+ totalCount++;
+ }
}
}
- });
+
+ } catch (final IOException e) {
+ valid = false;
+ logger.error("Failed to validate {} against schema due to {}",
flowFile, e);
+ }
if (isWholeFFValidation) {
- if (valid.get()) {
+ if (valid) {
logger.debug("Successfully validated {} against schema;
routing to 'valid'", flowFile);
session.getProvenanceReporter().route(flowFile, REL_VALID);
session.transfer(flowFile, REL_VALID);
} else {
session.getProvenanceReporter().route(flowFile, REL_INVALID);
- session.putAttribute(flowFile, "validation.error.message",
validationError.get());
+ session.putAttribute(flowFile, "validation.error.message",
validationError);
session.transfer(flowFile, REL_INVALID);
}
} else {
- if (valid.get()) {
- logger.debug("Successfully validated {} against schema;
routing to 'valid'", validFF.get());
- session.getProvenanceReporter().route(validFF.get(),
REL_VALID, "All " + totalCount.get() + " line(s) are valid");
- session.putAttribute(validFF.get(), "count.valid.lines",
Integer.toString(totalCount.get()));
- session.putAttribute(validFF.get(), "count.total.lines",
Integer.toString(totalCount.get()));
- session.transfer(validFF.get(), REL_VALID);
- session.remove(invalidFF.get());
+ if (valid) {
+ logger.debug("Successfully validated {} against schema;
routing to 'valid'", validFF);
+ session.getProvenanceReporter().route(validFF, REL_VALID, "All
" + totalCount + " line(s) are valid");
+ session.putAttribute(validFF, "count.valid.lines",
Integer.toString(totalCount));
+ session.putAttribute(validFF, "count.total.lines",
Integer.toString(totalCount));
+ session.transfer(validFF, REL_VALID);
+ session.remove(invalidFF);
session.remove(flowFile);
- } else if (okCount.get() != 0) {
+ } else if (okCount != 0) {
// because of the finally within the 'while' loop
- totalCount.set(totalCount.get() - 1);
-
- logger.debug("Successfully validated {}/{} line(s) in {}
against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
okCount.get(), totalCount.get(), flowFile);
- session.getProvenanceReporter().route(validFF.get(),
REL_VALID, okCount.get() + " valid line(s)");
- session.putAttribute(validFF.get(), "count.total.lines",
Integer.toString(totalCount.get()));
- session.putAttribute(validFF.get(), "count.valid.lines",
Integer.toString(okCount.get()));
- session.transfer(validFF.get(), REL_VALID);
- session.getProvenanceReporter().route(invalidFF.get(),
REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
- session.putAttribute(invalidFF.get(), "count.invalid.lines",
Integer.toString((totalCount.get() - okCount.get())));
- session.putAttribute(invalidFF.get(), "count.total.lines",
Integer.toString(totalCount.get()));
- session.putAttribute(invalidFF.get(),
"validation.error.message", validationError.get());
- session.transfer(invalidFF.get(), REL_INVALID);
+ totalCount--;
+
+ logger.debug("Successfully validated {}/{} line(s) in {}
against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
+ okCount, totalCount, flowFile);
+ session.getProvenanceReporter().route(validFF, REL_VALID,
okCount + " valid line(s)");
+ session.putAttribute(validFF, "count.total.lines",
Integer.toString(totalCount));
+ session.putAttribute(validFF, "count.valid.lines",
Integer.toString(okCount));
+ session.transfer(validFF, REL_VALID);
+ session.getProvenanceReporter().route(invalidFF, REL_INVALID,
(totalCount - okCount) + " invalid line(s)");
+ session.putAttribute(invalidFF, "count.invalid.lines",
Integer.toString((totalCount - okCount)));
+ session.putAttribute(invalidFF, "count.total.lines",
Integer.toString(totalCount));
+ session.putAttribute(invalidFF, "validation.error.message",
validationError);
+ session.transfer(invalidFF, REL_INVALID);
session.remove(flowFile);
} else {
- logger.debug("All lines in {} are invalid; routing to
'invalid'", invalidFF.get());
- session.getProvenanceReporter().route(invalidFF.get(),
REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
- session.putAttribute(invalidFF.get(), "count.invalid.lines",
Integer.toString(totalCount.get()));
- session.putAttribute(invalidFF.get(), "count.total.lines",
Integer.toString(totalCount.get()));
- session.putAttribute(invalidFF.get(),
"validation.error.message", validationError.get());
- session.transfer(invalidFF.get(), REL_INVALID);
- session.remove(validFF.get());
+ logger.debug("All lines in {} are invalid; routing to
'invalid'", invalidFF);
+ session.getProvenanceReporter().route(invalidFF, REL_INVALID,
"All " + totalCount + " line(s) are invalid");
+ session.putAttribute(invalidFF, "count.invalid.lines",
Integer.toString(totalCount));
+ session.putAttribute(invalidFF, "count.total.lines",
Integer.toString(totalCount));
+ session.putAttribute(invalidFF, "validation.error.message",
validationError);
+ session.transfer(invalidFF, REL_INVALID);
+ session.remove(validFF);
session.remove(flowFile);
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
index 2423db913d..c518bdfd29 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -16,10 +16,14 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
public class TestValidateCsv {
@Test
@@ -164,6 +168,106 @@ public class TestValidateCsv {
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
+ @Test
+ public void testNoSchema() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+
+
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.clearTransferState();
+ runner.enqueue(new byte[0]);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testValidateOnAttribute() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+ runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY,
ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
+ final Map<String, String> attributeMap = new HashMap<>();
+ attributeMap.put("CSV_ATTRIBUTE",
"bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+
+ runner.enqueue("FlowFile Random Data", attributeMap);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile
Random Data");
+ }
+
+ @Test
+ public void testValidateOnAttributeDoesNotExist() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+ runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
+ runner.setProperty(ValidateCsv.SCHEMA,
"ParseInt(),ParseInt(),ParseInt()");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY,
ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
+ final Map<String, String> attributeMap = new HashMap<>();
+ attributeMap.put("CSV_ATTRIBUTE_BAD",
"bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+
+ runner.enqueue("FlowFile Random Data", attributeMap);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile
Random Data");
+
+ runner.clearTransferState();
+ attributeMap.clear();
+ attributeMap.put("CSV_ATTRIBUTE", "");
+ runner.enqueue("FlowFile Random Data", attributeMap);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile
Random Data");
+ }
+
+ @Test
+ public void testValidateOnAttributeDoesNotExistNoSchema() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+ runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY,
ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
+ final Map<String, String> attributeMap = new HashMap<>();
+ attributeMap.put("CSV_ATTRIBUTE_BAD",
"bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+
+ runner.enqueue("FlowFile Random Data", attributeMap);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_INVALID, 1);
+ MockFlowFile flowfile =
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).getFirst();
+ flowfile.assertAttributeEquals("validation.error.message",
+ "No schema or CSV header could be identified.");
+ flowfile.assertContentEquals("FlowFile Random Data");
+ }
+
+ @Test
+ public void testValidateEmptyFile() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+ runner.setProperty(ValidateCsv.SCHEMA,
"ParseInt(),ParseInt(),ParseInt()");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY,
ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
+ final Map<String, String> attributeMap = new HashMap<>();
+
+ runner.enqueue(new byte[0], attributeMap);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+ }
+
@Test
public void testEqualsNotNullStrNotNullOrEmpty() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());