[
https://issues.apache.org/jira/browse/NIFI-751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14618973#comment-14618973
]
ASF GitHub Bot commented on NIFI-751:
-------------------------------------
Github user jackowaya commented on a diff in the pull request:
https://github.com/apache/incubator-nifi/pull/70#discussion_r34173958
--- Diff:
nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+import org.codehaus.jackson.JsonNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Responsible for converting records of one Avro type to another. Supports
+ * syntax like "record.field" to unpack fields and will try to do simple
type
+ * conversion.
+ */
+public class AvroRecordConverter {
+ private final Schema inputSchema;
+ private final Schema outputSchema;
+ private final Map<String, String> fieldMapping;
+
+ public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
+ JsonNode fieldMapping) {
+ this.inputSchema = inputSchema;
+ this.outputSchema = outputSchema;
+ this.fieldMapping = getFieldMapping(fieldMapping);
+ }
+
+ /**
+ * Converts one record to another given a input and output schema plus
+ * explicit mappings for certain target fields.
+ *
+ * @param record
+ * @param inputSchema
+ * @param outputSchema
+ * @param fieldMapping
+ * @return
+ * @throws AvroConversionException
+ */
+ public Record convert(Record input) throws AvroConversionException {
+ Record result = new Record(outputSchema);
+ for (Field outputField : outputSchema.getFields()) {
+ String inputFieldName = outputField.name();
+ if (fieldMapping.containsKey(outputField.name())) {
+ inputFieldName =
fieldMapping.get(outputField.name());
+ }
+
+ List<String> fieldParts =
Lists.newArrayList(inputFieldName
+ .split("\\."));
+ Record current = input;
+ while (fieldParts.size() > 1) {
+ // Step into the nested records as far as
needed.
+ current = (Record)
current.get(fieldParts.remove(0));
+ }
+
+ // Current should now be in the right place to read the
record.
+ Field f = getFieldForName(inputFieldName, inputSchema);
+ Object content = getContentForName(input,
inputFieldName,
+ input.getSchema());
+ result.put(outputField.name(),
+ convertData(content, f.schema(),
outputField.schema()));
+ }
+ return result;
+ }
+
+ /**
+ * @return the inputSchema
+ */
+ public Schema getInputSchema() {
+ return inputSchema;
+ }
+
+ /**
+ * @return the outputSchema
+ */
+ public Schema getOutputSchema() {
+ return outputSchema;
+ }
+
+ /**
+ * Converts the data from one schema to another. If the types are the
same,
+ * no change will be made, but simple conversions will be attempted for
+ * other types.
+ *
+ * @param content
+ * @param inputSchema
+ * @param outputSchema
+ * @return
+ * @throws AvroConversionException
+ */
+ private Object convertData(Object content, Schema inputSchema,
+ Schema outputSchema) throws AvroConversionException {
+ if (content == null) {
+ // No conversion can happen here.
+ return null;
+ }
+
+ Schema nonNillInput = undoNillableSchema(inputSchema);
+ Schema nonNillOutput = undoNillableSchema(outputSchema);
+ if (nonNillInput.getType().equals(nonNillOutput.getType())) {
+ return content;
+ } else {
+ switch (nonNillOutput.getType()) {
+ case STRING:
+ // This is the easiest conversion case.
Converting to string we
+ // assume
+ // that String.valueOf knows what to do.
+ return String.valueOf(content);
+
+ // For the rest of these, we will try to
convert through string
+ // which
+ // isn't super-efficient but should work.
+ case LONG:
+ try {
+ return
Long.parseLong(String.valueOf(content));
+ } catch (NumberFormatException e) {
+ throw new
AvroConversionException("Cannot convert "
+ + content + " to long");
+ }
+ case INT:
+ try {
+ return
Integer.parseInt(String.valueOf(content));
+ } catch (NumberFormatException e) {
+ throw new
AvroConversionException("Cannot convert "
+ + content + " to int");
+ }
+ case DOUBLE:
+ try {
+ return
Double.parseDouble(String.valueOf(content));
+ } catch (NumberFormatException e) {
+ throw new
AvroConversionException("Cannot convert "
+ + content + " to
double");
+ }
+ case FLOAT:
+ try {
+ return
Float.parseFloat(String.valueOf(content));
+ } catch (NumberFormatException e) {
+ throw new
AvroConversionException("Cannot convert "
+ + content + " to
float");
+ }
+ default:
+ throw new AvroConversionException("Cannot
convert to type "
+ + nonNillOutput.getType());
+ }
+ }
+ }
+
+ /**
+ * Get a mapping from output column to input column definition.
+ *
+ * @param mappingStr
+ * @return
+ * @throws AvroConversionException
+ */
+ private Map<String, String> getFieldMapping(JsonNode listNode) {
--- End diff --
There is an automatic mapping from a name to itself but I agree that it
would be nice to have a validation step that takes the field mappings into
account to make sure the output schema is fully mapped. I'll look into where I
can do that validation step, since it interacts with all three properties of
the conversion.
> Add Processor To Convert Avro Formats
> -------------------------------------
>
> Key: NIFI-751
> URL: https://issues.apache.org/jira/browse/NIFI-751
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Affects Versions: 0.1.0
> Reporter: Alan Jackoway
>
> When working with data from external sources, such as complex WSDL, I
> frequently wind up with complex nested data that is difficult to work with
> even when converted to Avro format. Specifically, I often have two needs:
> * Converting types of data, usually from string to long, double, etc. when
> APIs give only string data back.
> * Flattening data by taking fields out of nested records and putting them on
> the top level of the Avro file.
> Unfortunately the Kite JSONToAvro processor only supports exact conversions
> from JSON to a matching Avro schema and will not do data transformations of
> this type. Proposed processor to come.
> Discussed this with [~rdblue], so tagging him here as I don't have permission
> to set a CC for some reason.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)