This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 590fa20 NIFI-4892 - ValidateCSV: no doublequote escaping in invalid
output
590fa20 is described below
commit 590fa2063cd915123c5f46dac07890ff7607ffb0
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Feb 20 10:36:31 2018 +0100
NIFI-4892 - ValidateCSV: no doublequote escaping in invalid output
NIFI-4892 - ValidateCSV: no doublequote escaping in invalid output
NIFI-5907 - unit test
This closes #2481.
Signed-off-by: Koji Kawamura <[email protected]>
---
.../nifi/processors/standard/ValidateCsv.java | 52 ++++++----------------
.../nifi/processors/standard/TestValidateCsv.java | 50 ++++++++++++++++++++-
2 files changed, 62 insertions(+), 40 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
index 9796822..bd6a0e7 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -466,24 +466,25 @@ public class ValidateCsv extends AbstractProcessor {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
- NifiCsvListReader listReader = null;
- try {
- listReader = new NifiCsvListReader(new
InputStreamReader(in), csvPref);
+ try(final NifiCsvListReader listReader = new
NifiCsvListReader(new InputStreamReader(in), csvPref)) {
// handling of header
if(header) {
- List<String> headerList = listReader.read();
+
+ // read header
+ listReader.read();
+
if(!isWholeFFValidation) {
invalidFF.set(session.append(invalidFF.get(), new
OutputStreamCallback() {
@Override
public void process(OutputStream out) throws
IOException {
- out.write(print(headerList, csvPref,
isFirstLineInvalid.get()));
+
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
}
}));
validFF.set(session.append(validFF.get(), new
OutputStreamCallback() {
@Override
public void process(OutputStream out) throws
IOException {
- out.write(print(headerList, csvPref,
isFirstLineValid.get()));
+
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
}
}));
isFirstLineValid.set(false);
@@ -496,14 +497,14 @@ public class ValidateCsv extends AbstractProcessor {
while (!stop) {
try {
- final List<Object> list =
listReader.read(cellProcs);
- stop = list == null;
+ // read next row and check if no more row
+ stop = listReader.read(cellProcs) == null;
if(!isWholeFFValidation && !stop) {
validFF.set(session.append(validFF.get(), new
OutputStreamCallback() {
@Override
public void process(OutputStream out)
throws IOException {
- out.write(print(list, csvPref,
isFirstLineValid.get()));
+
out.write(print(listReader.getUntokenizedRow(), csvPref,
isFirstLineValid.get()));
}
}));
okCount.set(okCount.get() + 1);
@@ -524,7 +525,7 @@ public class ValidateCsv extends AbstractProcessor {
invalidFF.set(session.append(invalidFF.get(),
new OutputStreamCallback() {
@Override
public void process(OutputStream out)
throws IOException {
-
out.write(print(e.getCsvContext().getRowSource(), csvPref,
isFirstLineInvalid.get()));
+
out.write(print(listReader.getUntokenizedRow(), csvPref,
isFirstLineInvalid.get()));
}
}));
@@ -546,10 +547,6 @@ public class ValidateCsv extends AbstractProcessor {
} catch (final IOException e) {
valid.set(false);
logger.error("Failed to validate {} against schema due to
{}", new Object[]{flowFile}, e);
- } finally {
- if(listReader != null) {
- listReader.close();
- }
}
}
});
@@ -602,35 +599,12 @@ public class ValidateCsv extends AbstractProcessor {
}
}
- /**
- * Method used to correctly write the lines by taking into account end of
line
- * character and separator character.
- * @param list list of elements of the current row
- * @param csvPref CSV preferences
- * @param isFirstLine true if this is the first line we append
- * @return String to append in the flow file
- */
- private byte[] print(List<?> list, CsvPreference csvPref, boolean
isFirstLine) {
+ private byte[] print(String row, CsvPreference csvPref, boolean
isFirstLine) {
StringBuffer buffer = new StringBuffer();
-
if (!isFirstLine) {
buffer.append(csvPref.getEndOfLineSymbols());
}
-
- final int size = list.size();
- int i = 0;
- for (Object item : list) {
- if (item != null) {
- buffer.append(item.toString());
- }
-
- if (i < size - 1) {
- buffer.append((char) csvPref.getDelimiterChar());
- }
- i++;
- }
-
- return buffer.toString().getBytes();
+ return buffer.append(row).toString().getBytes();
}
/**
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
index b03aed4..c694ab1 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -75,7 +75,7 @@ public class TestValidateCsv {
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
-
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("#Name,Birthdate,Weight\nJohn,,63.2\nBob,,45.0");
+
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("#Name,Birthdate,Weight\nJohn,\"\",63.2\nBob,,45.0");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
@@ -337,4 +337,52 @@ public class TestValidateCsv {
runner.assertTransferCount(ValidateCsv.REL_VALID, 2);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
+
+ @Test
+ public void testEscapingLineByLine() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY,
ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
+
+ final String row =
"Header1,\"Header2,escaped\",Header3\r\nField1,\"Field2,escaped\",Field3";
+ runner.setProperty(ValidateCsv.SCHEMA,
"ParseInt(),ParseInt(),ParseInt()");
+
+ runner.enqueue(row);
+ runner.run(1);
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals(row);
+ runner.clearTransferState();
+
+ runner.setProperty(ValidateCsv.SCHEMA, "null,null,null");
+ runner.enqueue(row);
+ runner.run(1);
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
+
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals(row);
+ }
+
+ @Test
+ public void testQuote() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY,
ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
+
+ runner.setProperty(ValidateCsv.SCHEMA, "NotNull(), NotNull(),
NotNull()");
+
+ runner.enqueue("Header 1, Header 2, Header 3\n\"Content 1a, Content
1b\", Content 2, Content 3");
+ runner.run();
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Header
1, Header 2, Header 3\n\"Content 1a, Content 1b\", Content 2, Content 3");
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
+ }
}