NIFI-1156

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8966643d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8966643d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8966643d

Branch: refs/heads/master
Commit: 8966643d483d57ec9ee5154210049d4bccf177d6
Parents: 8d37af0
Author: Jeremy Dyer <[email protected]>
Authored: Tue Jan 5 20:53:48 2016 -0500
Committer: Jeremy Dyer <[email protected]>
Committed: Mon Jan 25 10:05:18 2016 -0500

----------------------------------------------------------------------
 .../processors/kite/InferAvroSchemaFromCSV.java | 234 +++++++++++++++++++
 .../kite/InferAvroSchemaFromJSON.java           | 164 +++++++++++++
 .../org.apache.nifi.processor.Processor         |   2 +
 .../kite/TestInferAvroSchemaFromCSV.java        | 129 ++++++++++
 .../kite/TestInferAvroSchemaFromJSON.java       | 215 +++++++++++++++++
 5 files changed, 744 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8966643d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
new file mode 100644
index 0000000..80a8b92
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.kitesdk.data.spi.filesystem.CSVProperties;
+import org.kitesdk.data.spi.filesystem.CSVUtil;
+import org.kitesdk.shaded.com.google.common.collect.ImmutableSet;
+
+@Tags({"kite", "csv", "avro", "infer", "schema"})
+@SeeAlso({InferAvroSchemaFromCSV.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Creates an Avro schema from a CSV file header. The 
header line definition can either be provided" +
+        "as a property to the processor OR present in the first line of CSV in 
the incoming FlowFile content. If a header" +
+        " property is specified for this processor no attempt will be made to 
use the header line that may be present" +
+        " in the incoming CSV FlowFile content.")
+public class InferAvroSchemaFromCSV
+        extends AbstractKiteProcessor {
+
+    public static final String CSV_DELIMITER = ",";
+
+    public static final PropertyDescriptor HEADER_LINE = new 
PropertyDescriptor.Builder()
+            .name("CSV Header Line")
+            .description("Comma separated string defining the column names 
expected in the CSV data. " +
+                    "EX: \"fname,lname,zip,address\"")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new 
PropertyDescriptor.Builder()
+            .name("CSV Header Line Skip Count")
+            .description("Specifies the number of header lines that should be 
skipped when reading the CSV data. If the " +
+                    " first line of the CSV data is a header line and you 
specify the \"CSV Header Line\" property " +
+                    "you need to set this vlaue to 1 otherwise the header line 
will be treated as actual data.")
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ESCAPE_STRING = new 
PropertyDescriptor.Builder()
+            .name("CSV escape string")
+            .description("String that represents an escape sequence in the CSV 
FlowFile content data.")
+            .required(true)
+            .defaultValue("\\")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor QUOTE_STRING = new 
PropertyDescriptor.Builder()
+            .name("CSV quote string")
+            .description("String that represents a literal quote character in 
the CSV FlowFile content data.")
+            .required(true)
+            .defaultValue("'")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor RECORD_NAME = new 
PropertyDescriptor.Builder()
+            .name("Avro Record Name")
+            .description("Value to be placed in the Avro record schema 
\"name\" field.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Charset")
+            .description("Character encoding of CSV data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new 
PropertyDescriptor.Builder()
+            .name("Pretty Avro Output")
+            .description("If true the Avro output will be formatted.")
+            .required(true)
+            .defaultValue("true")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("Successfully created Avro schema for CSV 
data.").build();
+
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("Original incoming FlowFile CSV data").build();
+
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Failed to create Avro schema for CSV data.").build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HEADER_LINE);
+        properties.add(HEADER_LINE_SKIP_COUNT);
+        properties.add(ESCAPE_STRING);
+        properties.add(QUOTE_STRING);
+        properties.add(PRETTY_AVRO_OUTPUT);
+        properties.add(RECORD_NAME);
+        properties.add(CHARSET);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, ProcessSession 
session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        try {
+
+            //Determines the header line either from the property input or the 
first line of the delimited file.
+            final AtomicReference<String> header = new AtomicReference<>();
+            final AtomicReference<Boolean> hasHeader = new AtomicReference<>();
+
+            if (context.getProperty(HEADER_LINE).isSet()) {
+                header.set(context.getProperty(HEADER_LINE).getValue());
+                hasHeader.set(Boolean.FALSE);
+            } else {
+                //Read the first line of the file to get the header value.
+                session.read(original, new InputStreamCallback() {
+                    @Override
+                    public void process(InputStream in) throws IOException {
+                        BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
+                        header.set(br.readLine());
+                        hasHeader.set(Boolean.TRUE);
+                        br.close();
+                    }
+                });
+            }
+
+            //Prepares the CSVProperties for kite
+            final CSVProperties props = new CSVProperties.Builder()
+                    .delimiter(CSV_DELIMITER)
+                    .escape(context.getProperty(ESCAPE_STRING).getValue())
+                    .quote(context.getProperty(QUOTE_STRING).getValue())
+                    .header(header.get())
+                    .hasHeader(hasHeader.get())
+                    
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).asInteger())
+                    .charset(context.getProperty(CHARSET).getValue())
+                    .build();
+
+            final Set<String> required = ImmutableSet.of();
+            final AtomicReference<String> avroSchema = new AtomicReference<>();
+
+            session.read(original, new InputStreamCallback() {
+                @Override
+                public void process(InputStream in) throws IOException {
+                    avroSchema.set(CSVUtil
+                            .inferNullableSchema(
+                                    
context.getProperty(RECORD_NAME).getValue(), in, props, required)
+                            
.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
+                }
+            });
+
+            FlowFile avroSchemaFF = session.write(session.create(), new 
OutputStreamCallback() {
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    out.write(avroSchema.get().getBytes());
+                }
+            });
+
+            //Transfer the sessions.
+            session.transfer(original, REL_ORIGINAL);
+            session.transfer(avroSchemaFF, REL_SUCCESS);
+
+        } catch (Exception ex) {
+            getLogger().error(ex.getMessage());
+            session.transfer(original, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8966643d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
new file mode 100644
index 0000000..77029ff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.kitesdk.data.spi.JsonUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Tags({"kite", "json", "avro", "infer", "schema"})
+@SeeAlso({InferAvroSchemaFromJSON.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Creates an Avro schema from JSON data. The Avro schema 
is inferred by examining the fields " +
+        "in the JSON input. The Avro schema generated by kite will use the 
same names present in the incoming JSON payload")
+public class InferAvroSchemaFromJSON
+        extends AbstractKiteProcessor {
+
+    public static final PropertyDescriptor RECORD_NAME = new 
PropertyDescriptor.Builder()
+            .name("Avro Record Name")
+            .description("Value to be placed in the Avro record schema 
\"name\" field.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new 
PropertyDescriptor.Builder()
+            .name("Number of records to analyze")
+            .description("Number of records that should be analyzed by kite to 
infer the Avro schema")
+            .required(true)
+            .defaultValue("10")
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Charset")
+            .description("Character encoding of CSV data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new 
PropertyDescriptor.Builder()
+            .name("Pretty Avro Output")
+            .description("If true the Avro output will be formatted.")
+            .required(true)
+            .defaultValue("true")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("Successfully created Avro schema for JSON 
data.").build();
+
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("Original incoming FlowFile JSON data").build();
+
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Failed to create Avro schema for JSON 
data.").build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(CHARSET);
+        properties.add(PRETTY_AVRO_OUTPUT);
+        properties.add(RECORD_NAME);
+        properties.add(NUM_RECORDS_TO_ANALYZE);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, ProcessSession 
session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        try {
+
+            final AtomicReference<String> avroSchema = new AtomicReference<>();
+            session.read(original, new InputStreamCallback() {
+                @Override
+                public void process(InputStream in) throws IOException {
+                    Schema as = JsonUtil.inferSchema(
+                            in, context.getProperty(RECORD_NAME).getValue(), 
context.getProperty(NUM_RECORDS_TO_ANALYZE).asInteger());
+                    
avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
+
+                }
+            });
+
+            FlowFile avroSchemaFF = session.write(session.create(), new 
OutputStreamCallback() {
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    out.write(avroSchema.get().getBytes());
+                }
+            });
+
+            //Transfer the FlowFiles
+            session.transfer(original, REL_ORIGINAL);
+            session.transfer(avroSchemaFF, REL_SUCCESS);
+
+        } catch (Exception ex) {
+            getLogger().error(ex.getMessage());
+            session.transfer(original, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8966643d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index ea99ff6..7a89856 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,3 +16,5 @@ org.apache.nifi.processors.kite.StoreInKiteDataset
 org.apache.nifi.processors.kite.ConvertCSVToAvro
 org.apache.nifi.processors.kite.ConvertJSONToAvro
 org.apache.nifi.processors.kite.ConvertAvroSchema
+org.apache.nifi.processors.kite.InferAvroSchemaFromCSV
+org.apache.nifi.processors.kite.InferAvroSchemaFromJSON

http://git-wip-us.apache.org/repos/asf/nifi/blob/8966643d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
new file mode 100644
index 0000000..78c4eab
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class TestInferAvroSchemaFromCSV {
+
+    private final String CSV_HEADER_LINE = "fname,lname,age,zip";
+
+    @Test
+    public void inferSchemaFromHeaderLineOfCSV() throws Exception {
+        TestRunner runner = 
TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
+
+        runner.assertNotValid();
+        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "0");
+        runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
+        runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
+        runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
+        runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
+        runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
+
+        runner.assertValid();
+
+        ProcessSession session = 
runner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.write(session.create(), new 
OutputStreamCallback() {
+            @Override
+            public void process(OutputStream out) throws IOException {
+                out.write((CSV_HEADER_LINE + 
"\nJeremy,Dyer,29,55555").getBytes());
+            }
+        });
+
+        //Enqueue the empty FlowFile
+        runner.enqueue(ff);
+        runner.run();
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void inferSchemaFormHeaderLinePropertyOfProcessor() throws 
Exception {
+        TestRunner runner = 
TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
+
+        runner.assertNotValid();
+        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, 
CSV_HEADER_LINE);
+        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
+        runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
+        runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
+        runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
+        runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
+        runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
+
+        runner.assertValid();
+
+        ProcessSession session = 
runner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.write(session.create(), new StreamCallback() {
+            @Override
+            public void process(InputStream in, OutputStream out) throws 
IOException {
+                out.write((CSV_HEADER_LINE + 
"\nJeremy,Dyer,29,55555").getBytes());
+            }
+        });
+
+        //Enqueue the empty FlowFile
+        runner.enqueue(ff);
+        runner.run();
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void inferSchemaFromEmptyContent() throws Exception  {
+        TestRunner runner = 
TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
+
+        runner.assertNotValid();
+        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, 
CSV_HEADER_LINE);
+        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
+        runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
+        runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
+        runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
+        runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
+        runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
+
+        runner.assertValid();
+
+        ProcessSession session = 
runner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.write(session.create(), new StreamCallback() {
+            @Override
+            public void process(InputStream in, OutputStream out) throws 
IOException {
+                out.write("".getBytes());
+            }
+        });
+
+        //Enqueue the empty FlowFile
+        runner.enqueue(ff);
+        runner.run();
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 1);
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 0);
+        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8966643d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
new file mode 100644
index 0000000..1c63ba1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.nifi.processors.kite.TestUtil.streamFor;
+
+public class TestInferAvroSchemaFromJSON {
+
+    public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
+            .fields().requiredString("id").requiredString("primaryColor")
+            .optionalString("secondaryColor").optionalString("price")
+            .endRecord();
+
+    public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
+            .fields().requiredLong("id").requiredString("color")
+            .optionalDouble("price").endRecord();
+
+    public static final String MAPPING = "[{\"source\":\"primaryColor\", 
\"target\":\"color\"}]";
+
+    public static final String FAILURE_SUMMARY = "Cannot convert free to 
double";
+
+    @Test
+    public void testBasicConversion() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
+                INPUT_SCHEMA.toString());
+        runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
+                OUTPUT_SCHEMA.toString());
+        runner.setProperty("primaryColor", "color");
+        runner.assertValid();
+
+        // Two valid rows, and one invalid because "free" is not a double.
+        GenericData.Record goodRecord1 = dataBasic("1", "blue", null, null);
+        GenericData.Record goodRecord2 = dataBasic("2", "red", "yellow", 
"5.5");
+        GenericData.Record badRecord = dataBasic("3", "red", "yellow", "free");
+        List<GenericData.Record> input = Lists.newArrayList(goodRecord1, 
goodRecord2,
+                badRecord);
+
+        runner.enqueue(streamFor(input));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 rows", 1, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 1);
+
+        MockFlowFile incompatible = runner.getFlowFilesForRelationship(
+                "failure").get(0);
+        GenericDatumReader<GenericData.Record> reader = new 
GenericDatumReader<GenericData.Record>(
+                INPUT_SCHEMA);
+        DataFileStream<GenericData.Record> stream = new 
DataFileStream<GenericData.Record>(
+                new ByteArrayInputStream(
+                        runner.getContentAsByteArray(incompatible)), reader);
+        int count = 0;
+        for (GenericData.Record r : stream) {
+            Assert.assertEquals(badRecord, r);
+            count++;
+        }
+        stream.close();
+        Assert.assertEquals(1, count);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+
+        GenericDatumReader<GenericData.Record> successReader = new 
GenericDatumReader<GenericData.Record>(
+                OUTPUT_SCHEMA);
+        DataFileStream<GenericData.Record> successStream = new 
DataFileStream<GenericData.Record>(
+                new ByteArrayInputStream(runner.getContentAsByteArray(runner
+                        .getFlowFilesForRelationship("success").get(0))),
+                successReader);
+        count = 0;
+        for (GenericData.Record r : successStream) {
+            if (count == 0) {
+                Assert.assertEquals(convertBasic(goodRecord1), r);
+            } else {
+                Assert.assertEquals(convertBasic(goodRecord2), r);
+            }
+            count++;
+        }
+        successStream.close();
+        Assert.assertEquals(2, count);
+    }
+
+    @Test
+    public void testNestedConversion() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
+                TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
+        runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
+                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
+        runner.setProperty("parent.id", "parentId");
+        runner.assertValid();
+
+        // Two valid rows
+        GenericData.Record goodRecord1 = dataNested(1L, "200", null, null);
+        GenericData.Record goodRecord2 = dataNested(2L, "300", 5L, 
"ParentCompany");
+        List<GenericData.Record> input = Lists.newArrayList(goodRecord1, 
goodRecord2);
+
+        runner.enqueue(streamFor(input));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 0 rows", 0, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+
+        GenericDatumReader<GenericData.Record> successReader = new 
GenericDatumReader<GenericData.Record>(
+                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
+        DataFileStream<GenericData.Record> successStream = new 
DataFileStream<GenericData.Record>(
+                new ByteArrayInputStream(runner.getContentAsByteArray(runner
+                        .getFlowFilesForRelationship("success").get(0))),
+                successReader);
+        int count = 0;
+        for (GenericData.Record r : successStream) {
+            if (count == 0) {
+                Assert.assertEquals(convertNested(goodRecord1), r);
+            } else {
+                Assert.assertEquals(convertNested(goodRecord2), r);
+            }
+            count++;
+        }
+        successStream.close();
+        Assert.assertEquals(2, count);
+    }
+
+    private GenericData.Record convertBasic(GenericData.Record inputRecord) {
+        GenericData.Record result = new GenericData.Record(OUTPUT_SCHEMA);
+        result.put("id", Long.parseLong(inputRecord.get("id").toString()));
+        result.put("color", inputRecord.get("primaryColor").toString());
+        if (inputRecord.get("price") == null) {
+            result.put("price", null);
+        } else {
+            result.put("price",
+                    Double.parseDouble(inputRecord.get("price").toString()));
+        }
+        return result;
+    }
+
+    private GenericData.Record dataBasic(String id, String primaryColor,
+                                         String secondaryColor, String price) {
+        GenericData.Record result = new GenericData.Record(INPUT_SCHEMA);
+        result.put("id", id);
+        result.put("primaryColor", primaryColor);
+        result.put("secondaryColor", secondaryColor);
+        result.put("price", price);
+        return result;
+    }
+
+    private GenericData.Record convertNested(GenericData.Record inputRecord) {
+        GenericData.Record result = new GenericData.Record(
+                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
+        result.put("l1", inputRecord.get("l1"));
+        result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
+        if (inputRecord.get("parent") != null) {
+            // output schema doesn't have parent name.
+            result.put("parentId",
+                    ((GenericData.Record) 
inputRecord.get("parent")).get("id"));
+        }
+        return result;
+    }
+
+    private GenericData.Record dataNested(long id, String companyName, Long 
parentId,
+                                          String parentName) {
+        GenericData.Record result = new 
GenericData.Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
+        result.put("l1", id);
+        result.put("s1", companyName);
+        if (parentId != null || parentName != null) {
+            GenericData.Record parent = new GenericData.Record(
+                    TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
+            parent.put("id", parentId);
+            parent.put("name", parentName);
+            result.put("parent", parent);
+        }
+        return result;
+    }
+}

Reply via email to