Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/158#discussion_r50574955
--- Diff:
nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
---
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.kitesdk.data.spi.JsonUtil;
+import org.kitesdk.data.spi.filesystem.CSVProperties;
+import org.kitesdk.data.spi.filesystem.CSVUtil;
+import org.kitesdk.shaded.com.google.common.collect.ImmutableSet;
+import org.omg.CORBA.OBJ_ADAPTER;
+
+
+import java.io.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Tags({"kite", "avro", "infer", "schema", "csv", "json"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Examines the contents of the incoming FlowFile to
infer an Avro schema. The processor will" +
+ " use the Kite SDK to make an attempt to automatically generate an
Avro schema from the incoming content." +
+ " When inferring the schema from JSON data the key names will be
used in the resulting Avro schema" +
+ " definition. When inferring from CSV data a \"header definition\"
must be present either as the first line of the incoming data" +
+ " or the \"header definition\" must be explicitly set in the
property \"CSV Header Definition\". A \"header definition\"" +
+ " is simply a single comma separated line defining the names of
each column. The \"header definition\" is" +
+ " required in order to determine the names that should be given to
each field in the resulting Avro definition." +
+ " When inferring data types the higher order data type is always
used if there is ambiguity." +
+ " For example when examining numerical values the type may be set
to \"long\" instead of \"integer\" since a long can" +
+ " safely hold the value of any \"integer\". Only CSV and JSON
content is currently supported for automatically inferring an" +
+ " Avro schema. The type of content present in the incoming
FlowFile is set by using the property \"Input Content Type\"." +
+ " The property can either be explicitly set to CSV, JSON, or \"use
mime.type value\" which will examine the" +
+ " value of the mime.type attribute on the incoming FlowFile to
determine the type of content present.")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "mime.type", description = "If
configured by property \"Input Content Type\" will" +
+ " use this value to determine what sort of content should
be inferred from the incoming FlowFile content."),
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = "inferred.avro.schema", description =
"If configured by \"Schema output destination\" to" +
+ " write to an attribute this will hold the resulting Avro
schema from inferring the incoming FlowFile content."),
+})
+public class InferAvroSchema
+ extends AbstractKiteProcessor {
+
+ public static final String CSV_DELIMITER = ",";
+ public static final String USE_MIME_TYPE = "use mime.type value";
+ public static final String JSON_CONTENT = "json";
+ public static final String CSV_CONTENT = "csv";
+
+ public static final String AVRO_SCHEMA_ATTRIBUTE_NAME =
"inferred.avro.schema";
+ public static final String DESTINATION_ATTRIBUTE =
"flowfile-attribute";
+ public static final String DESTINATION_CONTENT = "flowfile-content";
+ public static final String JSON_MIME_TYPE = "application/json";
+ public static final String CSV_MIME_TYPE = "text/csv";
+ public static final String AVRO_MIME_TYPE = "application/avro-binary";
+ public static final String AVRO_FILE_EXTENSION = ".avro";
+
+ public static final PropertyDescriptor SCHEMA_DESTINATION = new
PropertyDescriptor.Builder()
+ .name("Schema output destination")
+ .description("Control if Avro schema is written as a new
flowfile attribute '" + AVRO_SCHEMA_ATTRIBUTE_NAME + "' " +
+ "or written in the flowfile content. Writing to
flowfile content will overwrite any " +
+ "existing flowfile content.")
+ .required(true)
+ .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
+ .defaultValue(DESTINATION_CONTENT)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor INPUT_CONTENT_TYPE = new
PropertyDescriptor.Builder()
+ .name("Input Content Type")
+ .description("Content Type of data present in the incoming
FlowFile's content. Only \"" +
+ JSON_CONTENT + "\" or \"" + CSV_CONTENT + "\" are
supported." +
+ " If this value is set to \"" + USE_MIME_TYPE + "\"
the incoming Flowfile's attribute \"" + CoreAttributes.MIME_TYPE + "\"" +
+ " will be used to determine the Content Type.")
+ .allowableValues(USE_MIME_TYPE, JSON_CONTENT, CSV_CONTENT)
+ .defaultValue(USE_MIME_TYPE)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor
GET_CSV_HEADER_DEFINITION_FROM_INPUT = new PropertyDescriptor.Builder()
+ .name("Get CSV header definition from data")
+ .description("This property only applies to CSV content type.
If \"true\" the processor will attempt to read the CSV header definition from
the ")
--- End diff --
It appears that the description got cut off half-way through a sentence?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---