NIFI-1356

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7008a305
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7008a305
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7008a305

Branch: refs/heads/master
Commit: 7008a3054eb8304bd6160c520846fce6a6837321
Parents: dc2e8ce
Author: Jeremy Dyer <[email protected]>
Authored: Mon Jan 25 18:28:38 2016 -0500
Committer: Jeremy Dyer <[email protected]>
Committed: Mon Jan 25 18:28:38 2016 -0500

----------------------------------------------------------------------
 .../nifi-kite-processors/pom.xml                |  17 +
 .../nifi/processors/kite/InferAvroSchema.java   | 460 +++++++++++++++++++
 .../processors/kite/InferAvroSchemaFromCSV.java | 235 ----------
 .../kite/InferAvroSchemaFromJSON.java           | 164 -------
 .../org.apache.nifi.processor.Processor         |   3 +-
 .../processors/kite/TestInferAvroSchema.java    | 176 +++++++
 .../kite/TestInferAvroSchemaFromCSV.java        | 129 ------
 .../kite/TestInferAvroSchemaFromJSON.java       | 215 ---------
 .../src/test/resources/Shapes.json              |  10 +
 .../src/test/resources/Shapes.json.avro         |  34 ++
 .../src/test/resources/ShapesHeader.csv         |   1 +
 .../src/test/resources/Shapes_Header.csv        | 352 ++++++++++++++
 .../src/test/resources/Shapes_NoHeader.csv      | 351 ++++++++++++++
 .../src/test/resources/Shapes_header.csv.avro   |  23 +
 14 files changed, 1425 insertions(+), 745 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
index 2fd2344..62aaec8 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
@@ -130,4 +130,21 @@
         </dependency>
 
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/*.csv</exclude>
+                        <exclude>src/test/resources/*.json</exclude>
+                        <exclude>src/test/resources/*.avro</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
new file mode 100644
index 0000000..ad8b7e5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import org.apache.avro.Schema;
+import org.apache.commons.lang.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.kitesdk.data.spi.JsonUtil;
+import org.kitesdk.data.spi.filesystem.CSVProperties;
+import org.kitesdk.data.spi.filesystem.CSVUtil;
+
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.BufferedReader;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Tags({"kite", "avro", "infer", "schema", "csv", "json"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Examines the contents of the incoming FlowFile to 
infer an Avro schema. The processor will" +
+        " use the Kite SDK to make an attempt to automatically generate an 
Avro schema from the incoming content." +
+        " When inferring the schema from JSON data the key names will be used 
in the resulting Avro schema" +
+        " definition. When inferring from CSV data a \"header definition\" 
must be present either as the first line of the incoming data" +
+        " or the \"header definition\" must be explicitly set in the property 
\"CSV Header Definition\". A \"header definition\"" +
+        " is simply a single comma separated line defining the names of each 
column. The \"header definition\" is" +
+        " required in order to determine the names that should be given to 
each field in the resulting Avro definition." +
+        " When inferring data types the higher order data type is always used 
if there is ambiguity." +
+        " For example when examining numerical values the type may be set to 
\"long\" instead of \"integer\" since a long can" +
+        " safely hold the value of any \"integer\". Only CSV and JSON content 
is currently supported for automatically inferring an" +
+        " Avro schema. The type of content present in the incoming FlowFile is 
set by using the property \"Input Content Type\"." +
+        " The property can either be explicitly set to CSV, JSON, or \"use 
mime.type value\" which will examine the" +
+        " value of the mime.type attribute on the incoming FlowFile to 
determine the type of content present.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "mime.type", description = "If configured 
by property \"Input Content Type\" will" +
+                " use this value to determine what sort of content should be 
inferred from the incoming FlowFile content."),
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "inferred.avro.schema", description = "If 
configured by \"Schema output destination\" to" +
+                " write to an attribute this will hold the resulting Avro 
schema from inferring the incoming FlowFile content."),
+})
+public class InferAvroSchema
+        extends AbstractKiteProcessor {
+
+    public static final String CSV_DELIMITER = ",";
+    public static final String USE_MIME_TYPE = "use mime.type value";
+    public static final String JSON_CONTENT = "json";
+    public static final String CSV_CONTENT = "csv";
+
+    public static final String AVRO_SCHEMA_ATTRIBUTE_NAME = 
"inferred.avro.schema";
+    public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
+    public static final String DESTINATION_CONTENT = "flowfile-content";
+    public static final String JSON_MIME_TYPE = "application/json";
+    public static final String CSV_MIME_TYPE = "text/csv";
+    public static final String AVRO_MIME_TYPE = "application/avro-binary";
+    public static final String AVRO_FILE_EXTENSION = ".avro";
+
+    public static final PropertyDescriptor SCHEMA_DESTINATION = new 
PropertyDescriptor.Builder()
+            .name("Schema Output Destination")
+            .description("Control if Avro schema is written as a new flowfile 
attribute '" + AVRO_SCHEMA_ATTRIBUTE_NAME + "' " +
+                    "or written in the flowfile content. Writing to flowfile 
content will overwrite any " +
+                    "existing flowfile content.")
+            .required(true)
+            .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
+            .defaultValue(DESTINATION_CONTENT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INPUT_CONTENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Input Content Type")
+            .description("Content Type of data present in the incoming 
FlowFile's content. Only \"" +
+                    JSON_CONTENT + "\" or \"" + CSV_CONTENT + "\" are 
supported." +
+                    " If this value is set to \"" + USE_MIME_TYPE + "\" the 
incoming Flowfile's attribute \"" + CoreAttributes.MIME_TYPE + "\"" +
+                    " will be used to determine the Content Type.")
+            .allowableValues(USE_MIME_TYPE, JSON_CONTENT, CSV_CONTENT)
+            .defaultValue(USE_MIME_TYPE)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor 
GET_CSV_HEADER_DEFINITION_FROM_INPUT = new PropertyDescriptor.Builder()
+            .name("Get CSV Header Definition From Data")
+            .description("This property only applies to CSV content type. If 
\"true\" the processor will attempt to read the CSV header definition from the" 
+
+                    " first line of the input data.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CSV_HEADER_DEFINITION = new 
PropertyDescriptor.Builder()
+            .name("CSV Header Definition")
+            .description("This property only applies to CSV content type. 
Comma separated string defining the column names expected in the CSV data." +
+                    " EX: \"fname,lname,zip,address\". The elements present in 
this string should be in the same order" +
+                    " as the underlying data. Setting this property will cause 
the value of" +
+                    " \"" + GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + 
"\" to be ignored instead using this value.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .defaultValue(null)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new 
PropertyDescriptor.Builder()
+            .name("CSV Header Line Skip Count")
+            .description("This property only applies to CSV content type. 
Specifies the number of lines that should be skipped when reading the CSV 
data." +
+                    " Setting this value to 0 is equivalent to saying \"the 
entire contents of the file should be read\". If the" +
+                    " property \"" + 
GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + "\" is set then the first line 
of the CSV " +
+                    " file will be read in and treated as the CSV header 
definition. Since this will remove the header line from the data" +
+                    " care should be taken to make sure the value of \"CSV 
header Line Skip Count\" is set to 0 to ensure" +
+                    " no data is skipped.")
+            .required(true)
+            .defaultValue("0")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ESCAPE_STRING = new 
PropertyDescriptor.Builder()
+            .name("CSV Escape String")
+            .description("This property only applies to CSV content type. 
String that represents an escape sequence" +
+                    " in the CSV FlowFile content data.")
+            .required(true)
+            .defaultValue("\\")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor QUOTE_STRING = new 
PropertyDescriptor.Builder()
+            .name("CSV Quote String")
+            .description("This property only applies to CSV content type. 
String that represents a literal quote" +
+                    " character in the CSV FlowFile content data.")
+            .required(true)
+            .defaultValue("'")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor RECORD_NAME = new 
PropertyDescriptor.Builder()
+            .name("Avro Record Name")
+            .description("Value to be placed in the Avro record schema 
\"name\" field.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Charset")
+            .description("Character encoding of CSV data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new 
PropertyDescriptor.Builder()
+            .name("Pretty Avro Output")
+            .description("If true the Avro output will be formatted.")
+            .required(true)
+            .defaultValue("true")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new 
PropertyDescriptor.Builder()
+            .name("Number Of Records To Analyze")
+            .description("This property only applies to JSON content type. The 
number of JSON records that should be" +
+                    " examined to determine the Avro schema. The higher the 
value the better chance kite has of detecting" +
+                    " the appropriate type. However the default value of 10 is 
almost always enough.")
+            .required(true)
+            .defaultValue("10")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("Successfully created Avro schema from 
data.").build();
+
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("Original incoming FlowFile data").build();
+
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Failed to create Avro schema from data.").build();
+
+    public static final Relationship REL_UNSUPPORTED_CONTENT = new 
Relationship.Builder().name("unsupported content")
+            .description("The content found in the flowfile content is not of 
the required format.").build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(SCHEMA_DESTINATION);
+        properties.add(INPUT_CONTENT_TYPE);
+        properties.add(CSV_HEADER_DEFINITION);
+        properties.add(GET_CSV_HEADER_DEFINITION_FROM_INPUT);
+        properties.add(HEADER_LINE_SKIP_COUNT);
+        properties.add(ESCAPE_STRING);
+        properties.add(QUOTE_STRING);
+        properties.add(PRETTY_AVRO_OUTPUT);
+        properties.add(RECORD_NAME);
+        properties.add(NUM_RECORDS_TO_ANALYZE);
+        properties.add(CHARSET);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_UNSUPPORTED_CONTENT);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        try {
+
+            final AtomicReference<String> avroSchema = new AtomicReference<>();
+            switch (context.getProperty(INPUT_CONTENT_TYPE).getValue()) {
+                case USE_MIME_TYPE:
+                    avroSchema.set(inferAvroSchemaFromMimeType(original, 
context, session));
+                    break;
+                case JSON_CONTENT:
+                    avroSchema.set(inferAvroSchemaFromJSON(original, context, 
session));
+                    break;
+                case CSV_CONTENT:
+                    avroSchema.set(inferAvroSchemaFromCSV(original, context, 
session));
+                    break;
+                default:
+                    //Shouldn't be possible but just in case
+                    session.transfer(original, REL_UNSUPPORTED_CONTENT);
+                    break;
+            }
+
+
+            if (StringUtils.isNotEmpty(avroSchema.get())) {
+
+                String destination = 
context.getProperty(SCHEMA_DESTINATION).getValue();
+                FlowFile avroSchemaFF = null;
+
+                switch (destination) {
+                    case DESTINATION_ATTRIBUTE:
+                        avroSchemaFF = 
session.putAttribute(session.clone(original), AVRO_SCHEMA_ATTRIBUTE_NAME, 
avroSchema.get());
+                        //Leaves the original CoreAttributes.MIME_TYPE in 
place.
+                        break;
+                    case DESTINATION_CONTENT:
+                        avroSchemaFF = session.write(session.create(), new 
OutputStreamCallback() {
+                            @Override
+                            public void process(OutputStream out) throws 
IOException {
+                                out.write(avroSchema.get().getBytes());
+                            }
+                        });
+                        avroSchemaFF = session.putAttribute(avroSchemaFF, 
CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE);
+                        break;
+                    default:
+                        break;
+                }
+
+                //Transfer the sessions.
+                avroSchemaFF = session.putAttribute(avroSchemaFF, 
CoreAttributes.FILENAME.key(), 
(original.getAttribute(CoreAttributes.FILENAME.key()) + AVRO_FILE_EXTENSION));
+                session.transfer(avroSchemaFF, REL_SUCCESS);
+                session.transfer(original, REL_ORIGINAL);
+            } else {
+                //If the avroSchema is null then the content type is unknown 
and therefore unsupported
+                session.transfer(original, REL_UNSUPPORTED_CONTENT);
+            }
+
+        } catch (Exception ex) {
+            getLogger().error("Failed to infer Avro schema for {} due to {}", 
new Object[] {original, ex});
+            session.transfer(original, REL_FAILURE);
+        }
+    }
+
+
+    /**
+     * Infers the Avro schema from the input Flowfile content. To infer an 
Avro schema for CSV content a header line is
+     * required. You can configure the processor to pull that header line from 
the first line of the CSV data if it is
+     * present OR you can manually supply the desired header line as a 
property value.
+     *
+     * @param inputFlowFile
+     *  The original input FlowFile containing the CSV content as it entered 
this processor.
+     *
+     * @param context
+     *  ProcessContext to pull processor configurations.
+     *
+     * @param session
+     *  ProcessSession to transfer FlowFiles
+     */
+    private String inferAvroSchemaFromCSV(final FlowFile inputFlowFile, final 
ProcessContext context, final ProcessSession session) {
+
+        //Determines the header line either from the property input or the 
first line of the delimited file.
+        final AtomicReference<String> header = new AtomicReference<>();
+        final AtomicReference<Boolean> hasHeader = new AtomicReference<>();
+
+        if 
(context.getProperty(GET_CSV_HEADER_DEFINITION_FROM_INPUT).asBoolean() == 
Boolean.TRUE) {
+            //Read the first line of the file to get the header value.
+            session.read(inputFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(InputStream in) throws IOException {
+                    BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
+                    header.set(br.readLine());
+                    hasHeader.set(Boolean.TRUE);
+                    br.close();
+                }
+            });
+            hasHeader.set(Boolean.TRUE);
+        } else {
+            
header.set(context.getProperty(CSV_HEADER_DEFINITION).evaluateAttributeExpressions(inputFlowFile).getValue());
+            hasHeader.set(Boolean.FALSE);
+        }
+
+        //Prepares the CSVProperties for kite
+        final CSVProperties props = new CSVProperties.Builder()
+                .delimiter(CSV_DELIMITER)
+                
.escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions().getValue())
+                
.quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions().getValue())
+                .header(header.get())
+                .hasHeader(hasHeader.get())
+                
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions().asInteger())
+                .charset(context.getProperty(CHARSET).getValue())
+                .build();
+
+        final AtomicReference<String> avroSchema = new AtomicReference<>();
+
+        session.read(inputFlowFile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                avroSchema.set(CSVUtil
+                        .inferSchema(
+                                
context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(), in, 
props)
+                        
.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
+            }
+        });
+
+        return avroSchema.get();
+    }
+
+    /**
+     * Infers the Avro schema from the input Flowfile content.
+     *
+     * @param inputFlowFile
+     *  The original input FlowFile containing the JSON content as it entered 
this processor.
+     *
+     * @param context
+     *  ProcessContext to pull processor configurations.
+     *
+     * @param session
+     *  ProcessSession to transfer FlowFiles
+     */
+    private String inferAvroSchemaFromJSON(final FlowFile inputFlowFile, final 
ProcessContext context, final ProcessSession session) {
+
+        final AtomicReference<String> avroSchema = new AtomicReference<>();
+        session.read(inputFlowFile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                Schema as = JsonUtil.inferSchema(
+                        in, 
context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(),
+                        
context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions().asInteger());
+                
avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
+
+            }
+        });
+
+        return avroSchema.get();
+    }
+
+    /**
+     * Examines the incoming FlowFiles mime.type attribute to determine if the 
schema should be inferred for CSV or JSON data.
+     *
+     * @param inputFlowFile
+     *  The original input FlowFile containing the content.
+     *
+     * @param context
+     *  ProcessContext to pull processor configurations.
+     *
+     * @param session
+     *  ProcessSession to transfer FlowFiles
+     */
+    private String inferAvroSchemaFromMimeType(final FlowFile inputFlowFile, 
final ProcessContext context, final ProcessSession session) {
+
+        String mimeType = 
inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+        String avroSchema = "";
+
+        if (mimeType!= null) {
+            switch (mimeType) {
+                case JSON_MIME_TYPE:
+                    getLogger().debug("Inferred content type as JSON from 
\"{}\" value of \"{}\"", new Object[]{CoreAttributes.MIME_TYPE.key(),
+                            
inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())});
+                    avroSchema = inferAvroSchemaFromJSON(inputFlowFile, 
context, session);
+                    break;
+                case CSV_MIME_TYPE:
+                    getLogger().debug("Inferred content type as CSV from 
\"{}\" value of \"{}\"", new Object[]{CoreAttributes.MIME_TYPE.key(),
+                            
inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())});
+                    avroSchema = inferAvroSchemaFromCSV(inputFlowFile, 
context, session);
+                    break;
+                default:
+                    getLogger().warn("Unable to infer Avro Schema from {} 
because its mime type is {}, " +
+                            " which is not supported by this Processor", new 
Object[] {inputFlowFile, mimeType} );
+                    break;
+            }
+        }
+
+        return avroSchema;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
deleted file mode 100644
index 54ebedd..0000000
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-
-import java.io.IOException;
-import java.io.BufferedReader;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.kitesdk.data.spi.filesystem.CSVProperties;
-import org.kitesdk.data.spi.filesystem.CSVUtil;
-import org.kitesdk.shaded.com.google.common.collect.ImmutableSet;
-
-
-@Tags({"kite", "csv", "avro", "infer", "schema"})
-@SeeAlso({InferAvroSchemaFromCSV.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Creates an Avro schema from a CSV file header. The 
header line definition can either be provided" +
-        "as a property to the processor OR present in the first line of CSV in 
the incoming FlowFile content. If a header" +
-        " property is specified for this processor no attempt will be made to 
use the header line that may be present" +
-        " in the incoming CSV FlowFile content.")
-public class InferAvroSchemaFromCSV
-        extends AbstractKiteProcessor {
-
-    public static final String CSV_DELIMITER = ",";
-
-    public static final PropertyDescriptor HEADER_LINE = new 
PropertyDescriptor.Builder()
-            .name("CSV Header Line")
-            .description("Comma separated string defining the column names 
expected in the CSV data. " +
-                    "EX: \"fname,lname,zip,address\"")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new 
PropertyDescriptor.Builder()
-            .name("CSV Header Line Skip Count")
-            .description("Specifies the number of header lines that should be 
skipped when reading the CSV data. If the " +
-                    " first line of the CSV data is a header line and you 
specify the \"CSV Header Line\" property " +
-                    "you need to set this vlaue to 1 otherwise the header line 
will be treated as actual data.")
-            .required(true)
-            .defaultValue("0")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor ESCAPE_STRING = new 
PropertyDescriptor.Builder()
-            .name("CSV escape string")
-            .description("String that represents an escape sequence in the CSV 
FlowFile content data.")
-            .required(true)
-            .defaultValue("\\")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor QUOTE_STRING = new 
PropertyDescriptor.Builder()
-            .name("CSV quote string")
-            .description("String that represents a literal quote character in 
the CSV FlowFile content data.")
-            .required(true)
-            .defaultValue("'")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor RECORD_NAME = new 
PropertyDescriptor.Builder()
-            .name("Avro Record Name")
-            .description("Value to be placed in the Avro record schema 
\"name\" field.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
-            .name("Charset")
-            .description("Character encoding of CSV data.")
-            .required(true)
-            .defaultValue("UTF-8")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new 
PropertyDescriptor.Builder()
-            .name("Pretty Avro Output")
-            .description("If true the Avro output will be formatted.")
-            .required(true)
-            .defaultValue("true")
-            .allowableValues("true", "false")
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-            .build();
-
-
-    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
-            .description("Successfully created Avro schema for CSV 
data.").build();
-
-    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
-            .description("Original incoming FlowFile CSV data").build();
-
-    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
-            .description("Failed to create Avro schema for CSV data.").build();
-
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(HEADER_LINE);
-        properties.add(HEADER_LINE_SKIP_COUNT);
-        properties.add(ESCAPE_STRING);
-        properties.add(QUOTE_STRING);
-        properties.add(PRETTY_AVRO_OUTPUT);
-        properties.add(RECORD_NAME);
-        properties.add(CHARSET);
-        this.properties = Collections.unmodifiableList(properties);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_ORIGINAL);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-
-    @Override
-    public void onTrigger(final ProcessContext context, ProcessSession 
session) throws ProcessException {
-        final FlowFile original = session.get();
-        if (original == null) {
-            return;
-        }
-
-        try {
-
-            //Determines the header line either from the property input or the 
first line of the delimited file.
-            final AtomicReference<String> header = new AtomicReference<>();
-            final AtomicReference<Boolean> hasHeader = new AtomicReference<>();
-
-            if (context.getProperty(HEADER_LINE).isSet()) {
-                header.set(context.getProperty(HEADER_LINE).getValue());
-                hasHeader.set(Boolean.FALSE);
-            } else {
-                //Read the first line of the file to get the header value.
-                session.read(original, new InputStreamCallback() {
-                    @Override
-                    public void process(InputStream in) throws IOException {
-                        BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
-                        header.set(br.readLine());
-                        hasHeader.set(Boolean.TRUE);
-                        br.close();
-                    }
-                });
-            }
-
-            //Prepares the CSVProperties for kite
-            final CSVProperties props = new CSVProperties.Builder()
-                    .delimiter(CSV_DELIMITER)
-                    .escape(context.getProperty(ESCAPE_STRING).getValue())
-                    .quote(context.getProperty(QUOTE_STRING).getValue())
-                    .header(header.get())
-                    .hasHeader(hasHeader.get())
-                    
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).asInteger())
-                    .charset(context.getProperty(CHARSET).getValue())
-                    .build();
-
-            final Set<String> required = ImmutableSet.of();
-            final AtomicReference<String> avroSchema = new AtomicReference<>();
-
-            session.read(original, new InputStreamCallback() {
-                @Override
-                public void process(InputStream in) throws IOException {
-                    avroSchema.set(CSVUtil
-                            .inferNullableSchema(
-                                    
context.getProperty(RECORD_NAME).getValue(), in, props, required)
-                            
.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
-                }
-            });
-
-            FlowFile avroSchemaFF = session.write(session.create(), new 
OutputStreamCallback() {
-                @Override
-                public void process(OutputStream out) throws IOException {
-                    out.write(avroSchema.get().getBytes());
-                }
-            });
-
-            //Transfer the sessions.
-            session.transfer(original, REL_ORIGINAL);
-            session.transfer(avroSchemaFF, REL_SUCCESS);
-
-        } catch (Exception ex) {
-            getLogger().error(ex.getMessage());
-            session.transfer(original, REL_FAILURE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
deleted file mode 100644
index 77029ff..0000000
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import org.apache.avro.Schema;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.kitesdk.data.spi.JsonUtil;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicReference;
-
-@Tags({"kite", "json", "avro", "infer", "schema"})
-@SeeAlso({InferAvroSchemaFromJSON.class})
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Creates an Avro schema from JSON data. The Avro schema 
is inferred by examining the fields " +
-        "in the JSON input. The Avro schema generated by kite will use the 
same names present in the incoming JSON payload")
-public class InferAvroSchemaFromJSON
-        extends AbstractKiteProcessor {
-
-    public static final PropertyDescriptor RECORD_NAME = new 
PropertyDescriptor.Builder()
-            .name("Avro Record Name")
-            .description("Value to be placed in the Avro record schema 
\"name\" field.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new 
PropertyDescriptor.Builder()
-            .name("Number of records to analyze")
-            .description("Number of records that should be analyzed by kite to 
infer the Avro schema")
-            .required(true)
-            .defaultValue("10")
-            .addValidator(StandardValidators.INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
-            .name("Charset")
-            .description("Character encoding of CSV data.")
-            .required(true)
-            .defaultValue("UTF-8")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new 
PropertyDescriptor.Builder()
-            .name("Pretty Avro Output")
-            .description("If true the Avro output will be formatted.")
-            .required(true)
-            .defaultValue("true")
-            .allowableValues("true", "false")
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-            .build();
-
-
-    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
-            .description("Successfully created Avro schema for JSON 
data.").build();
-
-    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
-            .description("Original incoming FlowFile JSON data").build();
-
-    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
-            .description("Failed to create Avro schema for JSON 
data.").build();
-
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(CHARSET);
-        properties.add(PRETTY_AVRO_OUTPUT);
-        properties.add(RECORD_NAME);
-        properties.add(NUM_RECORDS_TO_ANALYZE);
-        this.properties = Collections.unmodifiableList(properties);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_ORIGINAL);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-
-    @Override
-    public void onTrigger(final ProcessContext context, ProcessSession 
session) throws ProcessException {
-        final FlowFile original = session.get();
-        if (original == null) {
-            return;
-        }
-
-        try {
-
-            final AtomicReference<String> avroSchema = new AtomicReference<>();
-            session.read(original, new InputStreamCallback() {
-                @Override
-                public void process(InputStream in) throws IOException {
-                    Schema as = JsonUtil.inferSchema(
-                            in, context.getProperty(RECORD_NAME).getValue(), 
context.getProperty(NUM_RECORDS_TO_ANALYZE).asInteger());
-                    
avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
-
-                }
-            });
-
-            FlowFile avroSchemaFF = session.write(session.create(), new 
OutputStreamCallback() {
-                @Override
-                public void process(OutputStream out) throws IOException {
-                    out.write(avroSchema.get().getBytes());
-                }
-            });
-
-            //Transfer the FlowFiles
-            session.transfer(original, REL_ORIGINAL);
-            session.transfer(avroSchemaFF, REL_SUCCESS);
-
-        } catch (Exception ex) {
-            getLogger().error(ex.getMessage());
-            session.transfer(original, REL_FAILURE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 7a89856..59fbe2d 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,5 +16,4 @@ org.apache.nifi.processors.kite.StoreInKiteDataset
 org.apache.nifi.processors.kite.ConvertCSVToAvro
 org.apache.nifi.processors.kite.ConvertJSONToAvro
 org.apache.nifi.processors.kite.ConvertAvroSchema
-org.apache.nifi.processors.kite.InferAvroSchemaFromCSV
-org.apache.nifi.processors.kite.InferAvroSchemaFromJSON
+org.apache.nifi.processors.kite.InferAvroSchema

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
new file mode 100644
index 0000000..3e8e702
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import org.junit.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestInferAvroSchema {
+
+    private TestRunner runner = null;
+
+    @Before
+    public void setup() {
+        runner = TestRunners.newTestRunner(InferAvroSchema.class);
+
+        //Prepare the common setup.
+        runner.assertNotValid();
+
+        runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, 
InferAvroSchema.USE_MIME_TYPE);
+        
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, 
"true");
+        runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, 
InferAvroSchema.DESTINATION_CONTENT);
+        runner.setProperty(InferAvroSchema.HEADER_LINE_SKIP_COUNT, "0");
+        runner.setProperty(InferAvroSchema.ESCAPE_STRING, "\\");
+        runner.setProperty(InferAvroSchema.QUOTE_STRING, "'");
+        runner.setProperty(InferAvroSchema.RECORD_NAME, 
"com.jeremydyer.contact");
+        runner.setProperty(InferAvroSchema.CHARSET, "UTF-8");
+        runner.setProperty(InferAvroSchema.PRETTY_AVRO_OUTPUT, "true");
+    }
+
+    @Test
+    public void inferAvroSchemaFromHeaderDefinitionOfCSVFile() throws 
Exception {
+
+        runner.assertValid();
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+        runner.enqueue(new 
File("src/test/resources/Shapes_Header.csv").toPath(), attributes);
+
+        runner.run();
+        runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+        runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(new 
File("src/test/resources/Shapes_Header.csv.avro").toPath());
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+    }
+
+    @Test
+    public void inferAvroSchemaFromJSONFile() throws Exception {
+
+        runner.assertValid();
+
+        runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, 
InferAvroSchema.USE_MIME_TYPE);
+
+        //Purposely set to True to test that none of the JSON file is read 
which would cause issues.
+        
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, 
"true");
+        runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, 
InferAvroSchema.DESTINATION_ATTRIBUTE);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+        runner.enqueue(new File("src/test/resources/Shapes.json").toPath(), 
attributes);
+
+        runner.run();
+        runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+        runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+        MockFlowFile data = 
runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+        String avroSchema = 
data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
+        String knownSchema = FileUtils.readFileToString(new 
File("src/test/resources/Shapes.json.avro"));
+        Assert.assertEquals(avroSchema, knownSchema);
+
+        //Since that avro schema is written to an attribute this should be teh 
same as the original
+        data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/json");
+    }
+
+    @Test
+    public void inferAvroSchemaFromCSVFile() throws Exception {
+
+        runner.assertValid();
+
+        //Read in the header
+        StringWriter writer = new StringWriter();
+        
IOUtils.copy((Files.newInputStream(Paths.get("src/test/resources/ShapesHeader.csv"),
 StandardOpenOption.READ)), writer, "UTF-8");
+        runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, 
writer.toString());
+        
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, 
"false");
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+        runner.enqueue(new 
File("src/test/resources/Shapes_NoHeader.csv").toPath(), attributes);
+
+        runner.run();
+        runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+        runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+        MockFlowFile data = 
runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+        
data.assertContentEquals(Paths.get("src/test/resources/Shapes_Header.csv.avro"));
+        data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+    }
+
+    @Test
+    public void inferSchemaFormHeaderLinePropertyOfProcessor() throws 
Exception {
+
+        final String CSV_HEADER_LINE = FileUtils.readFileToString(new 
File("src/test/resources/ShapesHeader.csv"));
+
+        runner.assertValid();
+
+        
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, 
"false");
+        runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, 
CSV_HEADER_LINE);
+        runner.setProperty(InferAvroSchema.HEADER_LINE_SKIP_COUNT, "1");
+
+        runner.assertValid();
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+        runner.enqueue((CSV_HEADER_LINE + 
"\nJeremy,Dyer,29,55555").getBytes(), attributes);
+
+        runner.run();
+        runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+        runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+        MockFlowFile data = 
runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+        data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+    }
+
+    @Test
+    public void inferSchemaFromEmptyContent() throws Exception  {
+        runner.assertValid();
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+        runner.enqueue("", attributes);
+
+        runner.run();
+        runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 1);
+        runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
deleted file mode 100644
index 78c4eab..0000000
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class TestInferAvroSchemaFromCSV {
-
-    private final String CSV_HEADER_LINE = "fname,lname,age,zip";
-
-    @Test
-    public void inferSchemaFromHeaderLineOfCSV() throws Exception {
-        TestRunner runner = 
TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
-
-        runner.assertNotValid();
-        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "0");
-        runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
-        runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
-        runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
-        runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
-        runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
-
-        runner.assertValid();
-
-        ProcessSession session = 
runner.getProcessSessionFactory().createSession();
-        FlowFile ff = session.write(session.create(), new 
OutputStreamCallback() {
-            @Override
-            public void process(OutputStream out) throws IOException {
-                out.write((CSV_HEADER_LINE + 
"\nJeremy,Dyer,29,55555").getBytes());
-            }
-        });
-
-        //Enqueue the empty FlowFile
-        runner.enqueue(ff);
-        runner.run();
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
-    }
-
-    @Test
-    public void inferSchemaFormHeaderLinePropertyOfProcessor() throws 
Exception {
-        TestRunner runner = 
TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
-
-        runner.assertNotValid();
-        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, 
CSV_HEADER_LINE);
-        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
-        runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
-        runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
-        runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
-        runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
-        runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
-
-        runner.assertValid();
-
-        ProcessSession session = 
runner.getProcessSessionFactory().createSession();
-        FlowFile ff = session.write(session.create(), new StreamCallback() {
-            @Override
-            public void process(InputStream in, OutputStream out) throws 
IOException {
-                out.write((CSV_HEADER_LINE + 
"\nJeremy,Dyer,29,55555").getBytes());
-            }
-        });
-
-        //Enqueue the empty FlowFile
-        runner.enqueue(ff);
-        runner.run();
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
-    }
-
-    @Test
-    public void inferSchemaFromEmptyContent() throws Exception  {
-        TestRunner runner = 
TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
-
-        runner.assertNotValid();
-        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, 
CSV_HEADER_LINE);
-        runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
-        runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
-        runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
-        runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
-        runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
-        runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
-
-        runner.assertValid();
-
-        ProcessSession session = 
runner.getProcessSessionFactory().createSession();
-        FlowFile ff = session.write(session.create(), new StreamCallback() {
-            @Override
-            public void process(InputStream in, OutputStream out) throws 
IOException {
-                out.write("".getBytes());
-            }
-        });
-
-        //Enqueue the empty FlowFile
-        runner.enqueue(ff);
-        runner.run();
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 1);
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 0);
-        runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 0);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
deleted file mode 100644
index 1c63ba1..0000000
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import com.google.common.collect.Lists;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
-
-import static org.apache.nifi.processors.kite.TestUtil.streamFor;
-
-public class TestInferAvroSchemaFromJSON {
-
-    public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
-            .fields().requiredString("id").requiredString("primaryColor")
-            .optionalString("secondaryColor").optionalString("price")
-            .endRecord();
-
-    public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
-            .fields().requiredLong("id").requiredString("color")
-            .optionalDouble("price").endRecord();
-
-    public static final String MAPPING = "[{\"source\":\"primaryColor\", 
\"target\":\"color\"}]";
-
-    public static final String FAILURE_SUMMARY = "Cannot convert free to 
double";
-
-    @Test
-    public void testBasicConversion() throws IOException {
-        TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
-        runner.assertNotValid();
-        runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
-                INPUT_SCHEMA.toString());
-        runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
-                OUTPUT_SCHEMA.toString());
-        runner.setProperty("primaryColor", "color");
-        runner.assertValid();
-
-        // Two valid rows, and one invalid because "free" is not a double.
-        GenericData.Record goodRecord1 = dataBasic("1", "blue", null, null);
-        GenericData.Record goodRecord2 = dataBasic("2", "red", "yellow", 
"5.5");
-        GenericData.Record badRecord = dataBasic("3", "red", "yellow", "free");
-        List<GenericData.Record> input = Lists.newArrayList(goodRecord1, 
goodRecord2,
-                badRecord);
-
-        runner.enqueue(streamFor(input));
-        runner.run();
-
-        long converted = runner.getCounterValue("Converted records");
-        long errors = runner.getCounterValue("Conversion errors");
-        Assert.assertEquals("Should convert 2 rows", 2, converted);
-        Assert.assertEquals("Should reject 1 rows", 1, errors);
-
-        runner.assertTransferCount("success", 1);
-        runner.assertTransferCount("failure", 1);
-
-        MockFlowFile incompatible = runner.getFlowFilesForRelationship(
-                "failure").get(0);
-        GenericDatumReader<GenericData.Record> reader = new 
GenericDatumReader<GenericData.Record>(
-                INPUT_SCHEMA);
-        DataFileStream<GenericData.Record> stream = new 
DataFileStream<GenericData.Record>(
-                new ByteArrayInputStream(
-                        runner.getContentAsByteArray(incompatible)), reader);
-        int count = 0;
-        for (GenericData.Record r : stream) {
-            Assert.assertEquals(badRecord, r);
-            count++;
-        }
-        stream.close();
-        Assert.assertEquals(1, count);
-        Assert.assertEquals("Should accumulate error messages",
-                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
-
-        GenericDatumReader<GenericData.Record> successReader = new 
GenericDatumReader<GenericData.Record>(
-                OUTPUT_SCHEMA);
-        DataFileStream<GenericData.Record> successStream = new 
DataFileStream<GenericData.Record>(
-                new ByteArrayInputStream(runner.getContentAsByteArray(runner
-                        .getFlowFilesForRelationship("success").get(0))),
-                successReader);
-        count = 0;
-        for (GenericData.Record r : successStream) {
-            if (count == 0) {
-                Assert.assertEquals(convertBasic(goodRecord1), r);
-            } else {
-                Assert.assertEquals(convertBasic(goodRecord2), r);
-            }
-            count++;
-        }
-        successStream.close();
-        Assert.assertEquals(2, count);
-    }
-
-    @Test
-    public void testNestedConversion() throws IOException {
-        TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
-        runner.assertNotValid();
-        runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
-                TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
-        runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
-                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
-        runner.setProperty("parent.id", "parentId");
-        runner.assertValid();
-
-        // Two valid rows
-        GenericData.Record goodRecord1 = dataNested(1L, "200", null, null);
-        GenericData.Record goodRecord2 = dataNested(2L, "300", 5L, 
"ParentCompany");
-        List<GenericData.Record> input = Lists.newArrayList(goodRecord1, 
goodRecord2);
-
-        runner.enqueue(streamFor(input));
-        runner.run();
-
-        long converted = runner.getCounterValue("Converted records");
-        long errors = runner.getCounterValue("Conversion errors");
-        Assert.assertEquals("Should convert 2 rows", 2, converted);
-        Assert.assertEquals("Should reject 0 rows", 0, errors);
-
-        runner.assertTransferCount("success", 1);
-        runner.assertTransferCount("failure", 0);
-
-        GenericDatumReader<GenericData.Record> successReader = new 
GenericDatumReader<GenericData.Record>(
-                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
-        DataFileStream<GenericData.Record> successStream = new 
DataFileStream<GenericData.Record>(
-                new ByteArrayInputStream(runner.getContentAsByteArray(runner
-                        .getFlowFilesForRelationship("success").get(0))),
-                successReader);
-        int count = 0;
-        for (GenericData.Record r : successStream) {
-            if (count == 0) {
-                Assert.assertEquals(convertNested(goodRecord1), r);
-            } else {
-                Assert.assertEquals(convertNested(goodRecord2), r);
-            }
-            count++;
-        }
-        successStream.close();
-        Assert.assertEquals(2, count);
-    }
-
-    private GenericData.Record convertBasic(GenericData.Record inputRecord) {
-        GenericData.Record result = new GenericData.Record(OUTPUT_SCHEMA);
-        result.put("id", Long.parseLong(inputRecord.get("id").toString()));
-        result.put("color", inputRecord.get("primaryColor").toString());
-        if (inputRecord.get("price") == null) {
-            result.put("price", null);
-        } else {
-            result.put("price",
-                    Double.parseDouble(inputRecord.get("price").toString()));
-        }
-        return result;
-    }
-
-    private GenericData.Record dataBasic(String id, String primaryColor,
-                                         String secondaryColor, String price) {
-        GenericData.Record result = new GenericData.Record(INPUT_SCHEMA);
-        result.put("id", id);
-        result.put("primaryColor", primaryColor);
-        result.put("secondaryColor", secondaryColor);
-        result.put("price", price);
-        return result;
-    }
-
-    private GenericData.Record convertNested(GenericData.Record inputRecord) {
-        GenericData.Record result = new GenericData.Record(
-                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
-        result.put("l1", inputRecord.get("l1"));
-        result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
-        if (inputRecord.get("parent") != null) {
-            // output schema doesn't have parent name.
-            result.put("parentId",
-                    ((GenericData.Record) 
inputRecord.get("parent")).get("id"));
-        }
-        return result;
-    }
-
-    private GenericData.Record dataNested(long id, String companyName, Long 
parentId,
-                                          String parentName) {
-        GenericData.Record result = new 
GenericData.Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
-        result.put("l1", id);
-        result.put("s1", companyName);
-        if (parentId != null || parentName != null) {
-            GenericData.Record parent = new GenericData.Record(
-                    TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
-            parent.put("id", parentId);
-            parent.put("name", parentName);
-            result.put("parent", parent);
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json
new file mode 100644
index 0000000..cf56f5b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json
@@ -0,0 +1,10 @@
+{
+  "shapes":
+  [
+    {"shape": "circle", "color": "red", "width": 100, "height": 100},
+    {"shape": "square", "color": "red", "width": 100, "height": 100},
+    {"shape": "sphere", "color": "red", "width": 100, "height": 100},
+    {"shape": "triangle", "color": "red", "width": 100, "height": 100},
+    {"shape": "rectangle", "color": "red", "width": 100, "height": 100}
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro
new file mode 100644
index 0000000..e5f91e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro
@@ -0,0 +1,34 @@
+{
+  "type" : "record",
+  "name" : "contact",
+  "namespace" : "com.jeremydyer",
+  "fields" : [ {
+    "name" : "shapes",
+    "type" : {
+      "type" : "array",
+      "items" : {
+        "type" : "record",
+        "name" : "shapes",
+        "namespace" : "",
+        "fields" : [ {
+          "name" : "shape",
+          "type" : "string",
+          "doc" : "Type inferred from '\"circle\"'"
+        }, {
+          "name" : "color",
+          "type" : "string",
+          "doc" : "Type inferred from '\"red\"'"
+        }, {
+          "name" : "width",
+          "type" : "int",
+          "doc" : "Type inferred from '100'"
+        }, {
+          "name" : "height",
+          "type" : "int",
+          "doc" : "Type inferred from '100'"
+        } ]
+      }
+    },
+    "doc" : "Type inferred from 
'[{\"shape\":\"circle\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"square\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"sphere\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"triangle\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"rectangle\",\"color\":\"red\",\"width\":100,\"height\":100}]'"
+  } ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv
new file mode 100644
index 0000000..d2dbc6d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv
@@ -0,0 +1 @@
+shape,color,width,height
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7008a305/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv
new file mode 100644
index 0000000..5d4aeb0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv
@@ -0,0 +1,352 @@
+shape,color,width,height
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
\ No newline at end of file

Reply via email to