Repository: nifi
Updated Branches:
  refs/heads/master 7963df89e -> 0aa4b7267


Added EL support to CSV properties

Clean up checkstyle errors

Added unit tests, fixed typo

This closes #709

Signed-off-by: Matt Burgess <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0aa4b726
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0aa4b726
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0aa4b726

Branch: refs/heads/master
Commit: 0aa4b72678a3543f2b02a484d9ba7da7997b9a34
Parents: 7963df8
Author: Simon Elliston Ball <[email protected]>
Authored: Fri Jul 22 20:53:22 2016 +0100
Committer: Matt Burgess <[email protected]>
Committed: Tue Feb 14 11:19:35 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/kite/ConvertCSVToAvro.java  | 30 +++---
 .../processors/kite/TestCSVToAvroProcessor.java | 99 +++++++++++---------
 2 files changed, 73 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0aa4b726/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 22244ee..9702916 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -77,7 +77,7 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
                 .subject(subject)
                 .input(input)
                 .explanation("Only non-null single characters are supported")
-                .valid(input.length() == 1 && input.charAt(0) != 0)
+                .valid((input.length() == 1 && input.charAt(0) != 0) || 
context.isExpressionLanguagePresent(input))
                 .build();
         }
     };
@@ -111,6 +111,7 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
         .name("CSV charset")
         .description("Character set for CSV files")
         .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .expressionLanguageSupported(true)
         .defaultValue(DEFAULTS.charset)
         .build();
 
@@ -119,6 +120,7 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
         .name("CSV delimiter")
         .description("Delimiter character for CSV records")
         .addValidator(CHAR_VALIDATOR)
+        .expressionLanguageSupported(true)
         .defaultValue(DEFAULTS.delimiter)
         .build();
 
@@ -127,6 +129,7 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
         .name("CSV quote character")
         .description("Quote character for CSV values")
         .addValidator(CHAR_VALIDATOR)
+        .expressionLanguageSupported(true)
         .defaultValue(DEFAULTS.quote)
         .build();
 
@@ -135,6 +138,7 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
         .name("CSV escape character")
         .description("Escape character for CSV values")
         .addValidator(CHAR_VALIDATOR)
+        .expressionLanguageSupported(true)
         .defaultValue(DEFAULTS.escape)
         .build();
 
@@ -143,6 +147,7 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
         .name("Use CSV header line")
         .description("Whether to use the first line as a header")
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .expressionLanguageSupported(true)
         .defaultValue(String.valueOf(DEFAULTS.useHeader))
         .build();
 
@@ -151,6 +156,7 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
         .name("Lines to skip")
         .description("Number of lines to skip before reading header or data")
         .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
+        .expressionLanguageSupported(true)
         .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
         .build();
 
@@ -172,10 +178,6 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
         .add(INCOMPATIBLE)
         .build();
 
-    // Immutable configuration
-    @VisibleForTesting
-    volatile CSVProperties props;
-
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTIES;
@@ -189,15 +191,6 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
     @OnScheduled
     public void createCSVProperties(ProcessContext context) throws IOException 
{
         super.setDefaultConfiguration(context);
-
-        this.props = new CSVProperties.Builder()
-            .charset(context.getProperty(CHARSET).getValue())
-            .delimiter(context.getProperty(DELIMITER).getValue())
-            .quote(context.getProperty(QUOTE).getValue())
-            .escape(context.getProperty(ESCAPE).getValue())
-            .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
-            .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
-            .build();
     }
 
     @Override
@@ -208,6 +201,15 @@ public class ConvertCSVToAvro extends 
AbstractKiteConvertProcessor {
             return;
         }
 
+        CSVProperties props = new CSVProperties.Builder()
+                
.charset(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingCSV).getValue())
+                
.delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(incomingCSV).getValue())
+                
.quote(context.getProperty(QUOTE).evaluateAttributeExpressions(incomingCSV).getValue())
+                
.escape(context.getProperty(ESCAPE).evaluateAttributeExpressions(incomingCSV).getValue())
+                
.hasHeader(context.getProperty(HAS_HEADER).evaluateAttributeExpressions(incomingCSV).asBoolean())
+                
.linesToSkip(context.getProperty(LINES_TO_SKIP).evaluateAttributeExpressions(incomingCSV).asInteger())
+                .build();
+
         String schemaProperty = context.getProperty(SCHEMA)
             .evaluateAttributeExpressions(incomingCSV)
             .getValue();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0aa4b726/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 9252e81..8bad01c 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -18,12 +18,19 @@
  */
 package org.apache.nifi.processors.kite;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
-import org.apache.nifi.processor.ProcessContext;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
 import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -32,6 +39,9 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.nifi.processors.kite.TestUtil.streamFor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestCSVToAvroProcessor {
 
@@ -220,57 +230,40 @@ public class TestCSVToAvroProcessor {
     }
 
     @Test
-    public void testCSVProperties() throws IOException {
+    public void testBasicConversionNoErrors() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
-        ConvertCSVToAvro processor = new ConvertCSVToAvro();
-        ProcessContext context = runner.getProcessContext();
-
-        // check defaults
-        processor.createCSVProperties(context);
-        Assert.assertEquals("Charset should match",
-                "utf8", processor.props.charset);
-        Assert.assertEquals("Delimiter should match",
-                ",", processor.props.delimiter);
-        Assert.assertEquals("Quote should match",
-                "\"", processor.props.quote);
-        Assert.assertEquals("Escape should match",
-                "\\", processor.props.escape);
-        Assert.assertEquals("Header flag should match",
-                false, processor.props.useHeader);
-        Assert.assertEquals("Lines to skip should match",
-                0, processor.props.linesToSkip);
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
 
-        runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
-        runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
-        runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
-        runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
-        runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
-        runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
-
-        // check updates
-        processor.createCSVProperties(context);
-        Assert.assertEquals("Charset should match",
-                "utf16", processor.props.charset);
-        Assert.assertEquals("Delimiter should match",
-                "|", processor.props.delimiter);
-        Assert.assertEquals("Quote should match",
-                "'", processor.props.quote);
-        Assert.assertEquals("Escape should match",
-                "\u2603", processor.props.escape);
-        Assert.assertEquals("Header flag should match",
-                true, processor.props.useHeader);
-        Assert.assertEquals("Lines to skip should match",
-                2, processor.props.linesToSkip);
+        runner.enqueue(streamFor("1,green\n2,blue,\n3,grey,12.95"));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 3 rows", 3, converted);
+        Assert.assertEquals("Should reject 0 row", 0, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 0);
     }
 
     @Test
-    public void testBasicConversionNoErrors() throws IOException {
+    public void testExpressionLanguageBasedCSVProperties() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
         runner.assertNotValid();
         runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
         runner.assertValid();
 
-        runner.enqueue(streamFor("1,green\n2,blue,\n3,grey,12.95"));
+        runner.setProperty(ConvertCSVToAvro.DELIMITER, "${csv.delimiter}");
+        runner.setProperty(ConvertCSVToAvro.QUOTE, "${csv.quote}");
+
+        HashMap<String, String> flowFileAttributes = new 
HashMap<String,String>();
+        flowFileAttributes.put("csv.delimiter", "|");
+        flowFileAttributes.put("csv.quote", "~");
+
+        runner.enqueue(streamFor("1|green\n2|~blue|field~|\n3|grey|12.95"), 
flowFileAttributes);
         runner.run();
 
         long converted = runner.getCounterValue("Converted records");
@@ -281,5 +274,27 @@ public class TestCSVToAvroProcessor {
         runner.assertTransferCount("success", 1);
         runner.assertTransferCount("failure", 0);
         runner.assertTransferCount("incompatible", 0);
+
+        final InputStream in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship("success").get(0).toByteArray());
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+            assertTrue(dataFileReader.hasNext());
+            GenericRecord record = dataFileReader.next();
+            assertEquals(1L, record.get("id"));
+            assertEquals("green", record.get("color").toString());
+            assertNull(record.get("price"));
+
+            assertTrue(dataFileReader.hasNext());
+            record = dataFileReader.next();
+            assertEquals(2L, record.get("id"));
+            assertEquals("blue|field", record.get("color").toString());
+            assertNull(record.get("price"));
+
+            assertTrue(dataFileReader.hasNext());
+            record = dataFileReader.next();
+            assertEquals(3L, record.get("id"));
+            assertEquals("grey", record.get("color").toString());
+            assertEquals(12.95, record.get("price"));
+        }
     }
 }

Reply via email to