Repository: nifi
Updated Branches:
  refs/heads/master 7cb39d636 -> ce25ae541


NIFI-5667: Add nested record support for PutORC

NIFI-5667: Fixed default table name

NIFI-5667: Fixed handling of binary types

NIFI-5667: Added backticks in Hive DDL generation

This closes #3057.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ce25ae54
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ce25ae54
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ce25ae54

Branch: refs/heads/master
Commit: ce25ae54196a318cbfcdb4dfe178607f4ac135c6
Parents: 7cb39d6
Author: Matthew Burgess <[email protected]>
Authored: Tue Oct 9 18:59:00 2018 -0400
Committer: Bryan Bende <[email protected]>
Committed: Mon Oct 15 10:10:47 2018 -0400

----------------------------------------------------------------------
 .../serialization/record/MockRecordParser.java  |   4 +
 .../hadoop/hive/ql/io/orc/NiFiOrcUtils.java     | 498 +++++++++----------
 .../org/apache/nifi/processors/orc/PutORC.java  |  10 +-
 .../orc/record/ORCHDFSRecordWriter.java         |  31 +-
 .../apache/nifi/processors/orc/PutORCTest.java  |  40 +-
 .../apache/nifi/util/orc/TestNiFiOrcUtils.java  | 142 +++---
 .../src/test/resources/nested_record.avsc       |  42 ++
 7 files changed, 419 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
index 9b5441e..2f7c634 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
@@ -58,6 +58,10 @@ public class MockRecordParser extends 
AbstractControllerService implements Recor
         fields.add(new RecordField(fieldName, type.getDataType(), isNullable));
     }
 
+    public void addSchemaField(final RecordField recordField) {
+        fields.add(recordField);
+    }
+
     public void addRecord(Object... values) {
         records.add(values);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index 2e6d2ca..1418b1b 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -16,11 +16,6 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.util.Utf8;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +38,15 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.orc.MemoryManager;
 import org.apache.orc.OrcConf;
 import org.apache.orc.impl.MemoryManagerImpl;
@@ -69,15 +73,9 @@ public class NiFiOrcUtils {
         if (o != null) {
             if (typeInfo instanceof UnionTypeInfo) {
                 OrcUnion union = new OrcUnion();
-                // Avro uses Utf8 and GenericData.EnumSymbol objects instead 
of Strings. This is handled in other places in the method, but here
-                // we need to determine the union types from the objects, so 
choose String.class if the object is one of those Avro classes
-                Class clazzToCompareTo = o.getClass();
-                if (o instanceof org.apache.avro.util.Utf8 || o instanceof 
GenericData.EnumSymbol) {
-                    clazzToCompareTo = String.class;
-                }
                 // Need to find which of the union types correspond to the 
primitive object
                 TypeInfo objectTypeInfo = 
TypeInfoUtils.getTypeInfoFromObjectInspector(
-                        
ObjectInspectorFactory.getReflectionObjectInspector(clazzToCompareTo, 
ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
+                        
ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(), 
ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
                 List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) 
typeInfo).getAllUnionObjectTypeInfos();
 
                 int index = 0;
@@ -106,7 +104,7 @@ public class NiFiOrcUtils {
             if (o instanceof Double) {
                 return new DoubleWritable((double) o);
             }
-            if (o instanceof String || o instanceof Utf8 || o instanceof 
GenericData.EnumSymbol) {
+            if (o instanceof String) {
                 return new Text(o.toString());
             }
             if (o instanceof ByteBuffer) {
@@ -126,10 +124,19 @@ public class NiFiOrcUtils {
             }
             if (o instanceof Object[]) {
                 Object[] objArray = (Object[]) o;
-                TypeInfo listTypeInfo = ((ListTypeInfo) 
typeInfo).getListElementTypeInfo();
-                return Arrays.stream(objArray)
-                        .map(o1 -> convertToORCObject(listTypeInfo, o1, 
hiveFieldNames))
-                        .collect(Collectors.toList());
+                if(TypeInfoFactory.binaryTypeInfo.equals(typeInfo)) {
+                    byte[] dest = new byte[objArray.length];
+                    for(int i=0;i<objArray.length;i++) {
+                        dest[i] = (byte) objArray[i];
+                    }
+                    return new BytesWritable(dest);
+                } else {
+                    // If not binary, assume a list of objects
+                    TypeInfo listTypeInfo = ((ListTypeInfo) 
typeInfo).getListElementTypeInfo();
+                    return Arrays.stream(objArray)
+                            .map(o1 -> convertToORCObject(listTypeInfo, o1, 
hiveFieldNames))
+                            .collect(Collectors.toList());
+                }
             }
             if (o instanceof int[]) {
                 int[] intArray = (int[]) o;
@@ -163,19 +170,30 @@ public class NiFiOrcUtils {
                         .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 
1, hiveFieldNames))
                         .collect(Collectors.toList());
             }
-            if (o instanceof GenericData.Array) {
-                GenericData.Array array = ((GenericData.Array) o);
-                // The type information in this case is interpreted as a List
-                TypeInfo listTypeInfo = ((ListTypeInfo) 
typeInfo).getListElementTypeInfo();
-                return array.stream().map((element) -> 
convertToORCObject(listTypeInfo, element, 
hiveFieldNames)).collect(Collectors.toList());
-            }
             if (o instanceof List) {
                 return o;
             }
+            if (o instanceof Record) {
+                Record record = (Record) o;
+                TypeInfo recordSchema = 
NiFiOrcUtils.getOrcSchema(record.getSchema(), hiveFieldNames);
+                List<RecordField> recordFields = 
record.getSchema().getFields();
+                if (recordFields != null) {
+                    Object[] fieldObjects = new Object[recordFields.size()];
+                    for (int i = 0; i < recordFields.size(); i++) {
+                        RecordField field = recordFields.get(i);
+                        DataType dataType = field.getDataType();
+                        Object fieldObject = record.getValue(field);
+                        fieldObjects[i] = 
convertToORCObject(NiFiOrcUtils.getOrcField(dataType, hiveFieldNames), 
fieldObject, hiveFieldNames);
+                    }
+                    return NiFiOrcUtils.createOrcStruct(recordSchema, 
fieldObjects);
+                }
+                return null;
+            }
             if (o instanceof Map) {
                 Map map = new HashMap();
-                TypeInfo keyInfo = ((MapTypeInfo) 
typeInfo).getMapKeyTypeInfo();
-                TypeInfo valueInfo = ((MapTypeInfo) 
typeInfo).getMapValueTypeInfo();
+                MapTypeInfo mapTypeInfo = ((MapTypeInfo) typeInfo);
+                TypeInfo keyInfo = mapTypeInfo.getMapKeyTypeInfo();
+                TypeInfo valueInfo = mapTypeInfo.getMapValueTypeInfo();
                 // Unions are not allowed as key/value types, so if we convert 
the key and value objects,
                 // they should return Writable objects
                 ((Map) o).forEach((key, value) -> {
@@ -188,21 +206,6 @@ public class NiFiOrcUtils {
                 });
                 return map;
             }
-            if (o instanceof GenericData.Record) {
-                GenericData.Record record = (GenericData.Record) o;
-                TypeInfo recordSchema = 
NiFiOrcUtils.getOrcField(record.getSchema(), hiveFieldNames);
-                List<Schema.Field> recordFields = 
record.getSchema().getFields();
-                if (recordFields != null) {
-                    Object[] fieldObjects = new Object[recordFields.size()];
-                    for (int i = 0; i < recordFields.size(); i++) {
-                        Schema.Field field = recordFields.get(i);
-                        Schema fieldSchema = field.schema();
-                        Object fieldObject = record.get(field.name());
-                        fieldObjects[i] = 
NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldSchema, 
hiveFieldNames), fieldObject, hiveFieldNames);
-                    }
-                    return NiFiOrcUtils.createOrcStruct(recordSchema, 
fieldObjects);
-                }
-            }
             throw new IllegalArgumentException("Error converting object of 
type " + o.getClass().getName() + " to ORC type " + typeInfo.getTypeName());
         } else {
             return null;
@@ -234,247 +237,228 @@ public class NiFiOrcUtils {
         return name.replaceAll("[\\. ]", "_");
     }
 
-    public static String generateHiveDDL(Schema avroSchema, String tableName, 
boolean hiveFieldNames) {
-        Schema.Type schemaType = avroSchema.getType();
-        StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT 
EXISTS ");
+    public static String generateHiveDDL(RecordSchema recordSchema, String 
tableName, boolean hiveFieldNames) {
+        StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT 
EXISTS `");
         sb.append(tableName);
-        sb.append(" (");
-        if (Schema.Type.RECORD.equals(schemaType)) {
-            List<String> hiveColumns = new ArrayList<>();
-            List<Schema.Field> fields = avroSchema.getFields();
-            if (fields != null) {
-                hiveColumns.addAll(
-                        fields.stream().map(field -> (hiveFieldNames ? 
field.name().toLowerCase() : field.name()) + " "
-                                + getHiveTypeFromAvroType(field.schema(), 
hiveFieldNames)).collect(Collectors.toList()));
-            }
-            sb.append(StringUtils.join(hiveColumns, ", "));
-            sb.append(") STORED AS ORC");
-            return sb.toString();
-        } else {
-            throw new IllegalArgumentException("Avro schema is of type " + 
schemaType.getName() + ", not RECORD");
+        sb.append("` (");
+        List<String> hiveColumns = new ArrayList<>();
+        List<RecordField> fields = recordSchema.getFields();
+        if (fields != null) {
+            hiveColumns.addAll(
+                    fields.stream().map(field -> "`" + (hiveFieldNames ? 
field.getFieldName().toLowerCase() : field.getFieldName()) + "` "
+                            + getHiveTypeFromFieldType(field.getDataType(), 
hiveFieldNames)).collect(Collectors.toList()));
         }
-    }
+        sb.append(StringUtils.join(hiveColumns, ", "));
+        sb.append(") STORED AS ORC");
+        return sb.toString();
 
+    }
 
-    public static TypeInfo getOrcField(Schema fieldSchema, boolean 
hiveFieldNames) throws IllegalArgumentException {
-        Schema.Type fieldType = fieldSchema.getType();
-        LogicalType logicalType = fieldSchema.getLogicalType();
-
-        switch (fieldType) {
-            case INT:
-            case LONG:
-                // Handle logical types
-                if (logicalType != null) {
-                    if (LogicalTypes.date().equals(logicalType)) {
-                        return TypeInfoFactory.dateTypeInfo;
-                    } else if (LogicalTypes.timeMicros().equals(logicalType)) {
-                        // Time micros isn't supported by our Record Field 
types (see AvroTypeUtil)
-                        throw new IllegalArgumentException("time-micros is not 
a supported field type");
-                    } else if (LogicalTypes.timeMillis().equals(logicalType)) {
-                        return TypeInfoFactory.intTypeInfo;
-                    } else if 
(LogicalTypes.timestampMicros().equals(logicalType)) {
-                        // Timestamp micros isn't supported by our Record 
Field types (see AvroTypeUtil)
-                        throw new IllegalArgumentException("timestamp-micros 
is not a supported field type");
-                    } else if 
(LogicalTypes.timestampMillis().equals(logicalType)) {
-                        return TypeInfoFactory.timestampTypeInfo;
-                    }
-                }
-                return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
-            case BYTES:
-                // Handle logical types
-                if (logicalType != null) {
-                    if (logicalType instanceof LogicalTypes.Decimal) {
-                        return TypeInfoFactory.doubleTypeInfo;
-                    }
-                }
-                return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
-
-            case BOOLEAN:
-            case DOUBLE:
-            case FLOAT:
-            case STRING:
-                return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
-
-            case UNION:
-                List<Schema> unionFieldSchemas = fieldSchema.getTypes();
-
-                if (unionFieldSchemas != null) {
-                    // Ignore null types in union
-                    List<TypeInfo> orcFields = 
unionFieldSchemas.stream().filter(
-                            unionFieldSchema -> 
!Schema.Type.NULL.equals(unionFieldSchema.getType()))
-                            .map((it) -> NiFiOrcUtils.getOrcField(it, 
hiveFieldNames))
-                            .collect(Collectors.toList());
-
-                    // Flatten the field if the union only has one non-null 
element
-                    if (orcFields.size() == 1) {
-                        return orcFields.get(0);
-                    } else {
-                        return TypeInfoFactory.getUnionTypeInfo(orcFields);
-                    }
-                }
-                return null;
-
-            case ARRAY:
-                return 
TypeInfoFactory.getListTypeInfo(getOrcField(fieldSchema.getElementType(), 
hiveFieldNames));
-
-            case MAP:
-                return TypeInfoFactory.getMapTypeInfo(
-                        
getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING),
-                        getOrcField(fieldSchema.getValueType(), 
hiveFieldNames));
-
-            case RECORD:
-                List<Schema.Field> avroFields = fieldSchema.getFields();
-                if (avroFields != null) {
-                    List<String> orcFieldNames = new 
ArrayList<>(avroFields.size());
-                    List<TypeInfo> orcFields = new 
ArrayList<>(avroFields.size());
-                    avroFields.forEach(avroField -> {
-                        String fieldName = hiveFieldNames ? 
avroField.name().toLowerCase() : avroField.name();
-                        orcFieldNames.add(fieldName);
-                        orcFields.add(getOrcField(avroField.schema(), 
hiveFieldNames));
-                    });
-                    return TypeInfoFactory.getStructTypeInfo(orcFieldNames, 
orcFields);
-                }
-                return null;
+    public static TypeInfo getOrcSchema(RecordSchema recordSchema, boolean 
hiveFieldNames) throws IllegalArgumentException {
+        List<RecordField> recordFields = recordSchema.getFields();
+        if (recordFields != null) {
+            List<String> orcFieldNames = new ArrayList<>(recordFields.size());
+            List<TypeInfo> orcFields = new ArrayList<>(recordFields.size());
+            recordFields.forEach(recordField -> {
+                String fieldName = hiveFieldNames ? 
recordField.getFieldName().toLowerCase() : recordField.getFieldName();
+                orcFieldNames.add(fieldName);
+                orcFields.add(getOrcField(recordField.getDataType(), 
hiveFieldNames));
+            });
+            return TypeInfoFactory.getStructTypeInfo(orcFieldNames, orcFields);
+        }
+        return null;
+    }
 
-            case ENUM:
-                // An enum value is just a String for ORC/Hive
-                return 
getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING);
 
-            default:
-                throw new IllegalArgumentException("Did not recognize Avro 
type " + fieldType.getName());
+    public static TypeInfo getOrcField(DataType dataType, boolean 
hiveFieldNames) throws IllegalArgumentException {
+        if (dataType == null) {
+            return null;
         }
 
-    }
+        RecordFieldType fieldType = dataType.getFieldType();
+        if (RecordFieldType.INT.equals(fieldType)
+                || RecordFieldType.LONG.equals(fieldType)
+                || RecordFieldType.BOOLEAN.equals(fieldType)
+                || RecordFieldType.DOUBLE.equals(fieldType)
+                || RecordFieldType.FLOAT.equals(fieldType)
+                || RecordFieldType.STRING.equals(fieldType)) {
+            return getPrimitiveOrcTypeFromPrimitiveFieldType(dataType);
+        }
+        if (RecordFieldType.DATE.equals(fieldType)) {
+            return TypeInfoFactory.dateTypeInfo;
+        }
+        if (RecordFieldType.TIME.equals(fieldType)) {
+            return TypeInfoFactory.intTypeInfo;
+        }
+        if (RecordFieldType.TIMESTAMP.equals(fieldType)) {
+            return TypeInfoFactory.timestampTypeInfo;
+        }
+        if (RecordFieldType.ARRAY.equals(fieldType)) {
+            ArrayDataType arrayDataType = (ArrayDataType) dataType;
+            if 
(RecordFieldType.BYTE.getDataType().equals(arrayDataType.getElementType())) {
+                return TypeInfoFactory.getPrimitiveTypeInfo("binary");
+            }
+            return 
TypeInfoFactory.getListTypeInfo(getOrcField(arrayDataType.getElementType(), 
hiveFieldNames));
+        }
+        if (RecordFieldType.CHOICE.equals(fieldType)) {
+            ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+            List<DataType> unionFieldSchemas = 
choiceDataType.getPossibleSubTypes();
+
+            if (unionFieldSchemas != null) {
+                // Ignore null types in union
+                List<TypeInfo> orcFields = unionFieldSchemas.stream()
+                        .map((it) -> NiFiOrcUtils.getOrcField(it, 
hiveFieldNames))
+                        .collect(Collectors.toList());
 
-    public static Schema.Type getAvroSchemaTypeOfObject(Object o) {
-        if (o == null) {
-            return Schema.Type.NULL;
-        } else if (o instanceof Integer) {
-            return Schema.Type.INT;
-        } else if (o instanceof Long) {
-            return Schema.Type.LONG;
-        } else if (o instanceof Boolean) {
-            return Schema.Type.BOOLEAN;
-        } else if (o instanceof byte[]) {
-            return Schema.Type.BYTES;
-        } else if (o instanceof Float) {
-            return Schema.Type.FLOAT;
-        } else if (o instanceof Double) {
-            return Schema.Type.DOUBLE;
-        } else if (o instanceof Enum) {
-            return Schema.Type.ENUM;
-        } else if (o instanceof Object[]) {
-            return Schema.Type.ARRAY;
-        } else if (o instanceof List) {
-            return Schema.Type.ARRAY;
-        } else if (o instanceof Map) {
-            return Schema.Type.MAP;
-        } else {
-            throw new IllegalArgumentException("Object of class " + 
o.getClass() + " is not a supported Avro Type");
+                // Flatten the field if the union only has one non-null element
+                if (orcFields.size() == 1) {
+                    return orcFields.get(0);
+                } else {
+                    return TypeInfoFactory.getUnionTypeInfo(orcFields);
+                }
+            }
+            return null;
+        }
+        if (RecordFieldType.MAP.equals(fieldType)) {
+            MapDataType mapDataType = (MapDataType) dataType;
+            return TypeInfoFactory.getMapTypeInfo(
+                    
getPrimitiveOrcTypeFromPrimitiveFieldType(RecordFieldType.STRING.getDataType()),
+                    getOrcField(mapDataType.getValueType(), hiveFieldNames));
         }
+        if (RecordFieldType.RECORD.equals(fieldType)) {
+            RecordDataType recordDataType = (RecordDataType) dataType;
+            List<RecordField> recordFields = 
recordDataType.getChildSchema().getFields();
+            if (recordFields != null) {
+                List<String> orcFieldNames = new 
ArrayList<>(recordFields.size());
+                List<TypeInfo> orcFields = new 
ArrayList<>(recordFields.size());
+                recordFields.forEach(recordField -> {
+                    String fieldName = hiveFieldNames ? 
recordField.getFieldName().toLowerCase() : recordField.getFieldName();
+                    orcFieldNames.add(fieldName);
+                    orcFields.add(getOrcField(recordField.getDataType(), 
hiveFieldNames));
+                });
+                return TypeInfoFactory.getStructTypeInfo(orcFieldNames, 
orcFields);
+            }
+            return null;
+        }
+
+        throw new IllegalArgumentException("Did not recognize field type " + 
fieldType.name());
     }
 
-    public static TypeInfo 
getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type avroType) throws 
IllegalArgumentException {
-        if (avroType == null) {
+    public static TypeInfo getPrimitiveOrcTypeFromPrimitiveFieldType(DataType 
rawDataType) throws IllegalArgumentException {
+        if (rawDataType == null) {
             throw new IllegalArgumentException("Avro type is null");
         }
-        switch (avroType) {
-            case INT:
-                return TypeInfoFactory.getPrimitiveTypeInfo("int");
-            case LONG:
-                return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
-            case BOOLEAN:
-                return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
-            case BYTES:
-                return TypeInfoFactory.getPrimitiveTypeInfo("binary");
-            case DOUBLE:
-                return TypeInfoFactory.getPrimitiveTypeInfo("double");
-            case FLOAT:
-                return TypeInfoFactory.getPrimitiveTypeInfo("float");
-            case STRING:
-                return TypeInfoFactory.getPrimitiveTypeInfo("string");
-            default:
-                throw new IllegalArgumentException("Avro type " + 
avroType.getName() + " is not a primitive type");
+        RecordFieldType fieldType = rawDataType.getFieldType();
+        if (RecordFieldType.INT.equals(fieldType)) {
+            return TypeInfoFactory.getPrimitiveTypeInfo("int");
+        }
+        if (RecordFieldType.LONG.equals(fieldType)) {
+            return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
         }
+        if (RecordFieldType.BOOLEAN.equals(fieldType)) {
+            return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
+        }
+        if (RecordFieldType.DOUBLE.equals(fieldType)) {
+            return TypeInfoFactory.getPrimitiveTypeInfo("double");
+        }
+        if (RecordFieldType.FLOAT.equals(fieldType)) {
+            return TypeInfoFactory.getPrimitiveTypeInfo("float");
+        }
+        if (RecordFieldType.STRING.equals(fieldType)) {
+            return TypeInfoFactory.getPrimitiveTypeInfo("string");
+        }
+
+        throw new IllegalArgumentException("Field type " + fieldType.name() + 
" is not a primitive type");
     }
 
-    public static String getHiveTypeFromAvroType(Schema avroSchema, boolean 
hiveFieldNames) {
-        if (avroSchema == null) {
-            throw new IllegalArgumentException("Avro schema is null");
+    public static String getHiveSchema(RecordSchema recordSchema, boolean 
hiveFieldNames) throws IllegalArgumentException {
+        List<RecordField> recordFields = recordSchema.getFields();
+        if (recordFields != null) {
+            List<String> hiveFields = new ArrayList<>(recordFields.size());
+            recordFields.forEach(recordField -> {
+                hiveFields.add((hiveFieldNames ? 
recordField.getFieldName().toLowerCase() : recordField.getFieldName())
+                        + ":" + 
getHiveTypeFromFieldType(recordField.getDataType(), hiveFieldNames));
+            });
+            return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">";
         }
+        return null;
+    }
 
-        Schema.Type avroType = avroSchema.getType();
-        LogicalType logicalType = avroSchema.getLogicalType();
+    public static String getHiveTypeFromFieldType(DataType rawDataType, 
boolean hiveFieldNames) {
+        if (rawDataType == null) {
+            throw new IllegalArgumentException("Field type is null");
+        }
+        RecordFieldType dataType = rawDataType.getFieldType();
 
-        switch (avroType) {
-            case INT:
-                if (logicalType != null) {
-                    if (LogicalTypes.date().equals(logicalType)) {
-                        return "DATE";
-                    }
-                    // Time-millis has no current corresponding Hive type, 
perhaps an INTERVAL type when that is fully supported.
-                }
-                return "INT";
-            case LONG:
-                if (logicalType != null) {
-                    if (LogicalTypes.timestampMillis().equals(logicalType)) {
-                        return "TIMESTAMP";
-                    }
-                    // Timestamp-micros and time-micros are not supported by 
our Record Field type system
-                }
-                return "BIGINT";
-            case BOOLEAN:
-                return "BOOLEAN";
-            case BYTES:
-                if (logicalType != null) {
-                    if (logicalType instanceof LogicalTypes.Decimal) {
-                        return "DOUBLE";
-                    }
-                }
+        if (RecordFieldType.INT.equals(dataType)) {
+            return "INT";
+        }
+        if (RecordFieldType.LONG.equals(dataType)) {
+            return "BIGINT";
+        }
+        if (RecordFieldType.BOOLEAN.equals(dataType)) {
+            return "BOOLEAN";
+        }
+        if (RecordFieldType.DOUBLE.equals(dataType)) {
+            return "DOUBLE";
+        }
+        if (RecordFieldType.FLOAT.equals(dataType)) {
+            return "FLOAT";
+        }
+        if (RecordFieldType.STRING.equals(dataType)) {
+            return "STRING";
+        }
+        if (RecordFieldType.DATE.equals(dataType)) {
+            return "DATE";
+        }
+        if (RecordFieldType.TIME.equals(dataType)) {
+            return "INT";
+        }
+        if (RecordFieldType.TIMESTAMP.equals(dataType)) {
+            return "TIMESTAMP";
+        }
+        if (RecordFieldType.ARRAY.equals(dataType)) {
+            ArrayDataType arrayDataType = (ArrayDataType) rawDataType;
+            if 
(RecordFieldType.BYTE.getDataType().equals(arrayDataType.getElementType())) {
                 return "BINARY";
-            case DOUBLE:
-                return "DOUBLE";
-            case FLOAT:
-                return "FLOAT";
-            case STRING:
-            case ENUM:
-                return "STRING";
-            case UNION:
-                List<Schema> unionFieldSchemas = avroSchema.getTypes();
-                if (unionFieldSchemas != null) {
-                    List<String> hiveFields = new ArrayList<>();
-                    for (Schema unionFieldSchema : unionFieldSchemas) {
-                        Schema.Type unionFieldSchemaType = 
unionFieldSchema.getType();
-                        // Ignore null types in union
-                        if (!Schema.Type.NULL.equals(unionFieldSchemaType)) {
-                            
hiveFields.add(getHiveTypeFromAvroType(unionFieldSchema, hiveFieldNames));
-                        }
-                    }
-                    // Flatten the field if the union only has one non-null 
element
-                    return (hiveFields.size() == 1)
-                            ? hiveFields.get(0)
-                            : "UNIONTYPE<" + StringUtils.join(hiveFields, ", 
") + ">";
+            }
+            return "ARRAY<" + 
getHiveTypeFromFieldType(arrayDataType.getElementType(), hiveFieldNames) + ">";
+        }
+        if (RecordFieldType.MAP.equals(dataType)) {
+            MapDataType mapDataType = (MapDataType) rawDataType;
+            return "MAP<STRING, " + 
getHiveTypeFromFieldType(mapDataType.getValueType(), hiveFieldNames) + ">";
+        }
+        if (RecordFieldType.CHOICE.equals(dataType)) {
+            ChoiceDataType choiceDataType = (ChoiceDataType) rawDataType;
+            List<DataType> unionFieldSchemas = 
choiceDataType.getPossibleSubTypes();
+
+            if (unionFieldSchemas != null) {
+                // Ignore null types in union
+                List<String> hiveFields = unionFieldSchemas.stream()
+                        .map((it) -> getHiveTypeFromFieldType(it, 
hiveFieldNames))
+                        .collect(Collectors.toList());
 
-                }
-                break;
-            case MAP:
-                return "MAP<STRING, " + 
getHiveTypeFromAvroType(avroSchema.getValueType(), hiveFieldNames) + ">";
-            case ARRAY:
-                return "ARRAY<" + 
getHiveTypeFromAvroType(avroSchema.getElementType(), hiveFieldNames) + ">";
-            case RECORD:
-                List<Schema.Field> recordFields = avroSchema.getFields();
-                if (recordFields != null) {
-                    List<String> hiveFields = recordFields.stream().map(
-                            recordField -> (hiveFieldNames ? 
recordField.name().toLowerCase() : recordField.name()) + ":"
-                                    + 
getHiveTypeFromAvroType(recordField.schema(), 
hiveFieldNames)).collect(Collectors.toList());
-                    return "STRUCT<" + StringUtils.join(hiveFields, ", ") + 
">";
-                }
-                break;
-            default:
-                break;
+                // Flatten the field if the union only has one non-null element
+                return (hiveFields.size() == 1)
+                        ? hiveFields.get(0)
+                        : "UNIONTYPE<" + StringUtils.join(hiveFields, ", ") + 
">";
+            }
+            return null;
+        }
+
+        if (RecordFieldType.RECORD.equals(dataType)) {
+            RecordDataType recordDataType = (RecordDataType) rawDataType;
+            List<RecordField> recordFields = 
recordDataType.getChildSchema().getFields();
+            if (recordFields != null) {
+                List<String> hiveFields = recordFields.stream().map(
+                        recordField -> ("`" + (hiveFieldNames ? 
recordField.getFieldName().toLowerCase() : recordField.getFieldName()) + "`:"
+                                + 
getHiveTypeFromFieldType(recordField.getDataType(), 
hiveFieldNames))).collect(Collectors.toList());
+                return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">";
+            }
+            return null;
         }
 
-        throw new IllegalArgumentException("Error converting Avro type " + 
avroType.getName() + " to Hive type");
+        throw new IllegalArgumentException("Error converting Avro type " + 
dataType.name() + " to Hive type");
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
index a0a5d13..9af566a 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.orc;
 
-import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
@@ -30,7 +29,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -157,19 +155,17 @@ public class PutORC extends AbstractPutHDFSRecord {
     public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext 
context, final FlowFile flowFile, final Configuration conf, final Path path, 
final RecordSchema schema)
             throws IOException, SchemaNotFoundException {
 
-        final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
-
         final long stripeSize = 
context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue();
         final int bufferSize = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final CompressionKind compressionType = 
CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue());
         final boolean normalizeForHive = 
context.getProperty(HIVE_FIELD_NAMES).asBoolean();
-        TypeInfo orcSchema = NiFiOrcUtils.getOrcField(avroSchema, 
normalizeForHive);
+        TypeInfo orcSchema = NiFiOrcUtils.getOrcSchema(schema, 
normalizeForHive);
         final Writer orcWriter = NiFiOrcUtils.createWriter(path, conf, 
orcSchema, stripeSize, compressionType, bufferSize);
         final String hiveTableName = 
context.getProperty(HIVE_TABLE_NAME).isSet()
                 ? 
context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue()
-                : 
NiFiOrcUtils.normalizeHiveTableName(avroSchema.getFullName());
+                : 
NiFiOrcUtils.normalizeHiveTableName(schema.getIdentifier().getName().orElse("unknown"));
         final boolean hiveFieldNames = 
context.getProperty(HIVE_FIELD_NAMES).asBoolean();
 
-        return new ORCHDFSRecordWriter(orcWriter, avroSchema, hiveTableName, 
hiveFieldNames);
+        return new ORCHDFSRecordWriter(orcWriter, schema, hiveTableName, 
hiveFieldNames);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java
index bd386a0..3adfb80 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java
@@ -16,13 +16,15 @@
  */
 package org.apache.nifi.processors.orc.record;
 
-import org.apache.avro.Schema;
 import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
 import org.apache.hadoop.hive.ql.io.orc.Writer;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
 import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
 import java.io.IOException;
@@ -33,27 +35,27 @@ import java.util.Map;
 import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE;
 
 /**
- * HDFSRecordWriter that writes ORC files using Avro as the schema 
representation.
+ * HDFSRecordWriter that writes ORC files using the NiFi Record API as the 
schema representation.
  */
 
 public class ORCHDFSRecordWriter implements HDFSRecordWriter {
 
-    private final Schema avroSchema;
+    private final RecordSchema recordSchema;
     private final TypeInfo orcSchema;
     private final Writer orcWriter;
     private final String hiveTableName;
     private final boolean hiveFieldNames;
-    private final List<Schema.Field> recordFields;
+    private final List<RecordField> recordFields;
     private final int numRecordFields;
     private Object[] workingRow;
 
-    public ORCHDFSRecordWriter(final Writer orcWriter, final Schema 
avroSchema, final String hiveTableName, final boolean hiveFieldNames) {
-        this.avroSchema = avroSchema;
+    public ORCHDFSRecordWriter(final Writer orcWriter, final RecordSchema 
recordSchema, final String hiveTableName, final boolean hiveFieldNames) {
+        this.recordSchema = recordSchema;
         this.orcWriter = orcWriter;
         this.hiveFieldNames = hiveFieldNames;
-        this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, 
this.hiveFieldNames);
+        this.orcSchema = NiFiOrcUtils.getOrcSchema(recordSchema, 
this.hiveFieldNames);
         this.hiveTableName = hiveTableName;
-        this.recordFields = avroSchema != null ? avroSchema.getFields() : null;
+        this.recordFields = recordSchema != null ? recordSchema.getFields() : 
null;
         this.numRecordFields = recordFields != null ? recordFields.size() : -1;
         // Reuse row object
         this.workingRow = numRecordFields > -1 ? new Object[numRecordFields] : 
null;
@@ -63,17 +65,18 @@ public class ORCHDFSRecordWriter implements 
HDFSRecordWriter {
     public void write(final Record record) throws IOException {
         if (recordFields != null) {
             for (int i = 0; i < numRecordFields; i++) {
-                final Schema.Field field = recordFields.get(i);
-                final Schema fieldSchema = field.schema();
-                final String fieldName = field.name();
-                Object o = record.getValue(fieldName);
+                final RecordField field = recordFields.get(i);
+                final DataType fieldType = field.getDataType();
+                final String fieldName = field.getFieldName();
+                Object o = record.getValue(field);
                 try {
-                    workingRow[i] = 
NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldSchema, 
hiveFieldNames), o, hiveFieldNames);
+                    workingRow[i] = 
NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldType, 
hiveFieldNames), o, hiveFieldNames);
                 } catch (ArrayIndexOutOfBoundsException aioobe) {
                     final String errorMsg = "Index out of bounds for column " 
+ i + ", type " + fieldName + ", and object " + o.toString();
                     throw new IOException(errorMsg, aioobe);
                 }
             }
+
             orcWriter.addRow(NiFiOrcUtils.createOrcStruct(orcSchema, 
workingRow));
         }
     }
@@ -93,7 +96,7 @@ public class ORCHDFSRecordWriter implements HDFSRecordWriter {
         }
 
         // Add Hive DDL Attribute
-        String hiveDDL = NiFiOrcUtils.generateHiveDDL(avroSchema, 
hiveTableName, hiveFieldNames);
+        String hiveDDL = NiFiOrcUtils.generateHiveDDL(recordSchema, 
hiveTableName, hiveFieldNames);
         Map<String, String> attributes = new HashMap<String, String>() {{
             put(HIVE_DDL_ATTRIBUTE, hiveDDL);
         }};

http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
index 3cc6fba..cbb1cf7 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
@@ -46,6 +46,7 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
@@ -162,7 +163,7 @@ public class PutORCTest {
         mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), 
filename);
         mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "100");
         mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
-                "CREATE EXTERNAL TABLE IF NOT EXISTS myTable (name STRING, 
favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS ORC");
+                "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`name` STRING, 
`favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS ORC");
 
         // verify we generated a provenance event
         final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();
@@ -233,7 +234,7 @@ public class PutORCTest {
         mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "10");
         // DDL will be created with field names normalized (lowercased, e.g.) 
for Hive by default
         mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
-                "CREATE EXTERNAL TABLE IF NOT EXISTS myTable (id INT, 
timemillis INT, timestampmillis TIMESTAMP, dt DATE, dec DOUBLE) STORED AS ORC");
+                "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, 
`timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DOUBLE) STORED 
AS ORC");
 
         // verify we generated a provenance event
         final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();
@@ -384,6 +385,41 @@ public class PutORCTest {
         Assert.assertFalse(tempAvroORCFile.exists());
     }
 
+    @Test
+    public void testNestedRecords() throws Exception {
+        testRunner = TestRunners.newTestRunner(proc);
+        testRunner.setProperty(PutORC.HADOOP_CONFIGURATION_RESOURCES, 
TEST_CONF_PATH);
+        testRunner.setProperty(PutORC.DIRECTORY, DIRECTORY);
+
+        MockRecordParser readerFactory = new MockRecordParser();
+
+        final String avroSchema = IOUtils.toString(new 
FileInputStream("src/test/resources/nested_record.avsc"), 
StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField);
+        }
+
+        Map<String,Object> nestedRecordMap = new HashMap<>();
+        nestedRecordMap.put("id", 11088000000001615L);
+        nestedRecordMap.put("x", "Hello World!");
+
+        RecordSchema nestedRecordSchema = 
AvroTypeUtil.createSchema(schema.getField("myField").schema());
+        MapRecord nestedRecord = new MapRecord(nestedRecordSchema, 
nestedRecordMap);
+        // This gets added in to its spot in the schema, which is already 
named "myField"
+        readerFactory.addRecord(nestedRecord);
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+
+        testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
+
+        testRunner.enqueue("trigger");
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
+    }
+
     private void verifyORCUsers(final Path orcUsers, final int 
numExpectedUsers) throws IOException {
         verifyORCUsers(orcUsers, numExpectedUsers, null);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index 4682d76..bde6af8 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -20,7 +20,6 @@ package org.apache.nifi.util.orc;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.util.Utf8;
 import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -31,6 +30,12 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -61,20 +66,19 @@ public class TestNiFiOrcUtils {
         };
 
         // Build a fake Avro record with all types
-        Schema testSchema = buildPrimitiveAvroSchema();
-        List<Schema.Field> fields = testSchema.getFields();
+        RecordSchema testSchema = buildPrimitiveRecordSchema();
+        List<RecordField> fields = testSchema.getFields();
         for (int i = 0; i < fields.size(); i++) {
-            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getOrcField(fields.get(i).schema(), false));
+            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getOrcField(fields.get(i).getDataType(), false));
         }
-
     }
 
     @Test
     public void test_getOrcField_union_optional_type() {
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("testRecord").namespace("any.data").fields();
         
builder.name("union").type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
-        Schema testSchema = builder.endRecord();
-        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("union").schema(), false);
+        RecordSchema testSchema = 
AvroTypeUtil.createSchema(builder.endRecord());
+        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("union").get().getDataType(), 
false);
         assertEquals(TypeInfoCreator.createBoolean(), orcType);
     }
 
@@ -82,8 +86,8 @@ public class TestNiFiOrcUtils {
     public void test_getOrcField_union() {
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("testRecord").namespace("any.data").fields();
         
builder.name("union").type().unionOf().intType().and().booleanType().endUnion().noDefault();
-        Schema testSchema = builder.endRecord();
-        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("union").schema(), false);
+        RecordSchema testSchema = 
AvroTypeUtil.createSchema(builder.endRecord());
+        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("union").get().getDataType(), 
false);
         assertEquals(
                 TypeInfoFactory.getUnionTypeInfo(Arrays.asList(
                         TypeInfoCreator.createInt(),
@@ -95,8 +99,8 @@ public class TestNiFiOrcUtils {
     public void test_getOrcField_map() {
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("testRecord").namespace("any.data").fields();
         builder.name("map").type().map().values().doubleType().noDefault();
-        Schema testSchema = builder.endRecord();
-        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("map").schema(), true);
+        RecordSchema testSchema = 
AvroTypeUtil.createSchema(builder.endRecord());
+        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("map").get().getDataType(), true);
         assertEquals(
                 TypeInfoFactory.getMapTypeInfo(
                         TypeInfoCreator.createString(),
@@ -108,8 +112,8 @@ public class TestNiFiOrcUtils {
     public void test_getOrcField_nested_map() {
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("testRecord").namespace("any.data").fields();
         
builder.name("map").type().map().values().map().values().doubleType().noDefault();
-        Schema testSchema = builder.endRecord();
-        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("map").schema(), false);
+        RecordSchema testSchema = 
AvroTypeUtil.createSchema(builder.endRecord());
+        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("map").get().getDataType(), false);
         assertEquals(
                 TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(),
                         
TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(), 
TypeInfoCreator.createDouble())),
@@ -120,8 +124,8 @@ public class TestNiFiOrcUtils {
     public void test_getOrcField_array() {
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("testRecord").namespace("any.data").fields();
         builder.name("array").type().array().items().longType().noDefault();
-        Schema testSchema = builder.endRecord();
-        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("array").schema(), false);
+        RecordSchema testSchema = 
AvroTypeUtil.createSchema(builder.endRecord());
+        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("array").get().getDataType(), 
false);
         assertEquals(
                 TypeInfoFactory.getListTypeInfo(TypeInfoCreator.createLong()),
                 orcType);
@@ -131,8 +135,8 @@ public class TestNiFiOrcUtils {
     public void test_getOrcField_complex_array() {
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("testRecord").namespace("any.data").fields();
         
builder.name("Array").type().array().items().map().values().floatType().noDefault();
-        Schema testSchema = builder.endRecord();
-        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("Array").schema(), true);
+        RecordSchema testSchema = 
AvroTypeUtil.createSchema(builder.endRecord());
+        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("Array").get().getDataType(), 
true);
         assertEquals(
                 
TypeInfoFactory.getListTypeInfo(TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(),
 TypeInfoCreator.createFloat())),
                 orcType);
@@ -144,9 +148,9 @@ public class TestNiFiOrcUtils {
         builder.name("Int").type().intType().noDefault();
         builder.name("Long").type().longType().longDefault(1L);
         builder.name("Array").type().array().items().stringType().noDefault();
-        Schema testSchema = builder.endRecord();
+        RecordSchema testSchema = 
AvroTypeUtil.createSchema(builder.endRecord());
         // Normalize field names for Hive, assert that their names are now 
lowercase
-        TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema, true);
+        TypeInfo orcType = NiFiOrcUtils.getOrcSchema(testSchema, true);
         assertEquals(
                 TypeInfoFactory.getStructTypeInfo(
                         Arrays.asList("int", "long", "array"),
@@ -161,13 +165,13 @@ public class TestNiFiOrcUtils {
     public void test_getOrcField_enum() {
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("testRecord").namespace("any.data").fields();
         builder.name("enumField").type().enumeration("enum").symbols("a", "b", 
"c").enumDefault("a");
-        Schema testSchema = builder.endRecord();
-        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("enumField").schema(), true);
+        RecordSchema testSchema = 
AvroTypeUtil.createSchema(builder.endRecord());
+        TypeInfo orcType = 
NiFiOrcUtils.getOrcField(testSchema.getField("enumField").get().getDataType(), 
true);
         assertEquals(TypeInfoCreator.createString(), orcType);
     }
 
     @Test
-    public void test_getPrimitiveOrcTypeFromPrimitiveAvroType() {
+    public void test_getPrimitiveOrcTypeFromPrimitiveFieldType() {
         // Expected ORC types
         TypeInfo[] expectedTypes = {
                 TypeInfoCreator.createInt(),
@@ -179,17 +183,20 @@ public class TestNiFiOrcUtils {
                 TypeInfoCreator.createString(),
         };
 
-        Schema testSchema = buildPrimitiveAvroSchema();
-        List<Schema.Field> fields = testSchema.getFields();
+        RecordSchema testSchema = buildPrimitiveRecordSchema();
+        List<RecordField> fields = testSchema.getFields();
         for (int i = 0; i < fields.size(); i++) {
-            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(fields.get(i).schema().getType()));
+            // Skip Binary as it is a primitive type in Avro but a complex 
type (array[byte]) in the NiFi Record API
+            if (i == 5) {
+                continue;
+            }
+            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveFieldType(fields.get(i).getDataType()));
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void test_getPrimitiveOrcTypeFromPrimitiveAvroType_badType() {
-        Schema.Type nonPrimitiveType = Schema.Type.ARRAY;
-        
NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(nonPrimitiveType);
+    public void test_getPrimitiveOrcTypeFromPrimitiveFieldType_badType() {
+        
NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveFieldType(RecordFieldType.ARRAY.getDataType());
     }
 
     @Test
@@ -213,7 +220,7 @@ public class TestNiFiOrcUtils {
     }
 
     @Test
-    public void test_getHiveTypeFromAvroType_primitive() {
+    public void test_getHiveTypeFromFieldType_primitive() {
         // Expected ORC types
         String[] expectedTypes = {
                 "INT",
@@ -225,15 +232,15 @@ public class TestNiFiOrcUtils {
                 "STRING",
         };
 
-        Schema testSchema = buildPrimitiveAvroSchema();
-        List<Schema.Field> fields = testSchema.getFields();
+        RecordSchema testSchema = buildPrimitiveRecordSchema();
+        List<RecordField> fields = testSchema.getFields();
         for (int i = 0; i < fields.size(); i++) {
-            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getHiveTypeFromAvroType(fields.get(i).schema(), false));
+            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getHiveTypeFromFieldType(fields.get(i).getDataType(), false));
         }
     }
 
     @Test
-    public void test_getHiveTypeFromAvroType_complex() {
+    public void test_getHiveTypeFromFieldType_complex() {
         // Expected ORC types
         String[] expectedTypes = {
                 "INT",
@@ -243,46 +250,45 @@ public class TestNiFiOrcUtils {
                 "ARRAY<INT>"
         };
 
-        Schema testSchema = buildComplexAvroSchema();
-        List<Schema.Field> fields = testSchema.getFields();
+        RecordSchema testSchema = buildComplexRecordSchema();
+        List<RecordField> fields = testSchema.getFields();
         for (int i = 0; i < fields.size(); i++) {
-            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getHiveTypeFromAvroType(fields.get(i).schema(), false));
+            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getHiveTypeFromFieldType(fields.get(i).getDataType(), false));
         }
 
         assertEquals("STRUCT<myInt:INT, myMap:MAP<STRING, DOUBLE>, 
myEnum:STRING, myLongOrFloat:UNIONTYPE<BIGINT, FLOAT>, myIntList:ARRAY<INT>>",
-                NiFiOrcUtils.getHiveTypeFromAvroType(testSchema, false));
+                NiFiOrcUtils.getHiveSchema(testSchema, false));
     }
 
     @Test
     public void test_generateHiveDDL_primitive() {
-        Schema avroSchema = buildPrimitiveAvroSchema();
-        String ddl = NiFiOrcUtils.generateHiveDDL(avroSchema, "myHiveTable", 
false);
-        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable (int 
INT, long BIGINT, boolean BOOLEAN, float FLOAT, double DOUBLE, bytes BINARY, 
string STRING)"
+        RecordSchema schema = buildPrimitiveRecordSchema();
+        String ddl = NiFiOrcUtils.generateHiveDDL(schema, "myHiveTable", 
false);
+        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `myHiveTable` (`int` 
INT, `long` BIGINT, `boolean` BOOLEAN, `float` FLOAT, `double` DOUBLE, `bytes` 
BINARY, `string` STRING)"
                 + " STORED AS ORC", ddl);
     }
 
     @Test
     public void test_generateHiveDDL_complex() {
-        Schema avroSchema = buildComplexAvroSchema();
-        String ddl = NiFiOrcUtils.generateHiveDDL(avroSchema, "myHiveTable", 
false);
-        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable "
-                + "(myInt INT, myMap MAP<STRING, DOUBLE>, myEnum STRING, 
myLongOrFloat UNIONTYPE<BIGINT, FLOAT>, myIntList ARRAY<INT>)"
+        RecordSchema schema = buildComplexRecordSchema();
+        String ddl = NiFiOrcUtils.generateHiveDDL(schema, "myHiveTable", 
false);
+        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `myHiveTable` "
+                + "(`myInt` INT, `myMap` MAP<STRING, DOUBLE>, `myEnum` STRING, 
`myLongOrFloat` UNIONTYPE<BIGINT, FLOAT>, `myIntList` ARRAY<INT>)"
                 + " STORED AS ORC", ddl);
     }
 
     @Test
     public void test_generateHiveDDL_complex_normalize() {
-        Schema avroSchema = buildComplexAvroSchema();
-        String ddl = NiFiOrcUtils.generateHiveDDL(avroSchema, "myHiveTable", 
true);
-        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable "
-                + "(myint INT, mymap MAP<STRING, DOUBLE>, myenum STRING, 
mylongorfloat UNIONTYPE<BIGINT, FLOAT>, myintlist ARRAY<INT>)"
+        RecordSchema schema = buildComplexRecordSchema();
+        String ddl = NiFiOrcUtils.generateHiveDDL(schema, "myHiveTable", true);
+        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `myHiveTable` "
+                + "(`myint` INT, `mymap` MAP<STRING, DOUBLE>, `myenum` STRING, 
`mylongorfloat` UNIONTYPE<BIGINT, FLOAT>, `myintlist` ARRAY<INT>)"
                 + " STORED AS ORC", ddl);
     }
 
     @Test
     public void test_convertToORCObject() {
-        Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x", "y", 
"z");
-        List<Object> objects = Arrays.asList(new Utf8("Hello"), new 
GenericData.EnumSymbol(schema, "x"));
+        List<Object> objects = Arrays.asList("Hello", "x");
         objects.forEach((avroObject) -> {
             Object o = 
NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("uniontype<bigint,string>"),
 avroObject, true);
             assertTrue(o instanceof UnionObject);
@@ -297,7 +303,7 @@ public class TestNiFiOrcUtils {
     }
 
     @Test
-    public void test_getHiveTypeFromAvroType_complex_normalize() {
+    public void test_getHiveTypeFromFieldType_complex_normalize() {
         // Expected ORC types
         String[] expectedTypes = {
                 "INT",
@@ -307,22 +313,22 @@ public class TestNiFiOrcUtils {
                 "ARRAY<INT>"
         };
 
-        Schema testSchema = buildComplexAvroSchema();
-        List<Schema.Field> fields = testSchema.getFields();
+        RecordSchema testSchema = buildComplexRecordSchema();
+        List<RecordField> fields = testSchema.getFields();
         for (int i = 0; i < fields.size(); i++) {
-            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getHiveTypeFromAvroType(fields.get(i).schema(), true));
+            assertEquals(expectedTypes[i], 
NiFiOrcUtils.getHiveTypeFromFieldType(fields.get(i).getDataType(), true));
         }
 
         assertEquals("STRUCT<myint:INT, mymap:MAP<STRING, DOUBLE>, 
myenum:STRING, mylongorfloat:UNIONTYPE<BIGINT, FLOAT>, myintlist:ARRAY<INT>>",
-                NiFiOrcUtils.getHiveTypeFromAvroType(testSchema, true));
+                NiFiOrcUtils.getHiveSchema(testSchema, true));
     }
 
     //////////////////
     // Helper methods
     //////////////////
 
-    public static Schema buildPrimitiveAvroSchema() {
-        // Build a fake Avro record with all primitive types
+    public static RecordSchema buildPrimitiveRecordSchema() {
+        // Build a fake record with all primitive types
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("test.record").namespace("any.data").fields();
         builder.name("int").type().intType().noDefault();
         builder.name("long").type().longType().longDefault(1L);
@@ -331,12 +337,12 @@ public class TestNiFiOrcUtils {
         builder.name("double").type().doubleType().doubleDefault(0.0);
         builder.name("bytes").type().bytesType().noDefault();
         builder.name("string").type().stringType().stringDefault("default");
-        return builder.endRecord();
+        return AvroTypeUtil.createSchema(builder.endRecord());
     }
 
-    public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, 
boolean b, float f, double d, ByteBuffer bytes, String string) {
-        Schema schema = buildPrimitiveAvroSchema();
-        GenericData.Record row = new GenericData.Record(schema);
+    public static Record buildPrimitiveRecord(int i, long l, boolean b, float 
f, double d, ByteBuffer bytes, String string) {
+        RecordSchema schema = buildPrimitiveRecordSchema();
+        Map<String, Object> row = new HashMap<>();
         row.put("int", i);
         row.put("long", l);
         row.put("boolean", b);
@@ -344,7 +350,7 @@ public class TestNiFiOrcUtils {
         row.put("double", d);
         row.put("bytes", bytes);
         row.put("string", string);
-        return row;
+        return new MapRecord(schema, row);
     }
 
     public static TypeInfo buildPrimitiveOrcSchema() {
@@ -359,7 +365,7 @@ public class TestNiFiOrcUtils {
                         TypeInfoCreator.createString()));
     }
 
-    public static Schema buildComplexAvroSchema() {
+    public static RecordSchema buildComplexRecordSchema() {
         // Build a fake Avro record with nested  types
         final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("complex.record").namespace("any.data").fields();
         
builder.name("myInt").type().unionOf().nullType().and().intType().endUnion().nullDefault();
@@ -367,18 +373,18 @@ public class TestNiFiOrcUtils {
         builder.name("myEnum").type().enumeration("myEnum").symbols("ABC", 
"DEF", "XYZ").enumDefault("ABC");
         
builder.name("myLongOrFloat").type().unionOf().longType().and().floatType().endUnion().noDefault();
         builder.name("myIntList").type().array().items().intType().noDefault();
-        return builder.endRecord();
+        return AvroTypeUtil.createSchema(builder.endRecord());
     }
 
-    public static GenericData.Record buildComplexAvroRecord(Integer i, 
Map<String, Double> m, String e, Object unionVal, List<Integer> intArray) {
-        Schema schema = buildComplexAvroSchema();
-        GenericData.Record row = new GenericData.Record(schema);
+    public static Record buildComplexAvroRecord(Integer i, Map<String, Double> 
m, String e, Object unionVal, List<Integer> intArray) {
+        RecordSchema schema = buildComplexRecordSchema();
+        Map<String, Object> row = new HashMap<>();
         row.put("myInt", i);
         row.put("myMap", m);
         row.put("myEnum", e);
         row.put("myLongOrFloat", unionVal);
         row.put("myIntList", intArray);
-        return row;
+        return new MapRecord(schema, row);
     }
 
     public static TypeInfo buildComplexOrcSchema() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested_record.avsc
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested_record.avsc
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested_record.avsc
new file mode 100644
index 0000000..d208b15
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested_record.avsc
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "name": "nested_record_test",
+  "namespace": "org.apache.nifi",
+  "type": "record",
+  "fields": [
+       {
+      "name": "myField",
+      "type":
+      {
+        "name": "recordField",
+        "namespace": "org.apache.nifi",
+        "type": "record",
+        "fields": [
+          {
+            "name": "id",
+            "type": "long"
+          },
+          {
+            "name": "x",
+            "type": "string"
+          }
+        ]
+      }
+    }
+  ]
+}
\ No newline at end of file

Reply via email to