Repository: nifi Updated Branches: refs/heads/master 7cb39d636 -> ce25ae541
NIFI-5667: Add nested record support for PutORC NIFI-5667: Fixed default table name NIFI-5667: Fixed handling of binary types NIFI-5667: Added backticks in Hive DDL generation This closes #3057. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ce25ae54 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ce25ae54 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ce25ae54 Branch: refs/heads/master Commit: ce25ae54196a318cbfcdb4dfe178607f4ac135c6 Parents: 7cb39d6 Author: Matthew Burgess <[email protected]> Authored: Tue Oct 9 18:59:00 2018 -0400 Committer: Bryan Bende <[email protected]> Committed: Mon Oct 15 10:10:47 2018 -0400 ---------------------------------------------------------------------- .../serialization/record/MockRecordParser.java | 4 + .../hadoop/hive/ql/io/orc/NiFiOrcUtils.java | 498 +++++++++---------- .../org/apache/nifi/processors/orc/PutORC.java | 10 +- .../orc/record/ORCHDFSRecordWriter.java | 31 +- .../apache/nifi/processors/orc/PutORCTest.java | 40 +- .../apache/nifi/util/orc/TestNiFiOrcUtils.java | 142 +++--- .../src/test/resources/nested_record.avsc | 42 ++ 7 files changed, 419 insertions(+), 348 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java index 9b5441e..2f7c634 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java @@ -58,6 +58,10 @@ public class MockRecordParser extends AbstractControllerService implements Recor fields.add(new RecordField(fieldName, type.getDataType(), isNullable)); } + public void addSchemaField(final RecordField recordField) { + fields.add(recordField); + } + public void addRecord(Object... values) { records.add(values); } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java index 2e6d2ca..1418b1b 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java @@ -16,11 +16,6 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.util.Utf8; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -43,6 +38,15 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.orc.MemoryManager; import org.apache.orc.OrcConf; import org.apache.orc.impl.MemoryManagerImpl; @@ -69,15 +73,9 @@ public class NiFiOrcUtils { if (o != null) { if (typeInfo instanceof UnionTypeInfo) { OrcUnion union = new OrcUnion(); - // Avro uses Utf8 and GenericData.EnumSymbol objects instead of Strings. This is handled in other places in the method, but here - // we need to determine the union types from the objects, so choose String.class if the object is one of those Avro classes - Class clazzToCompareTo = o.getClass(); - if (o instanceof org.apache.avro.util.Utf8 || o instanceof GenericData.EnumSymbol) { - clazzToCompareTo = String.class; - } // Need to find which of the union types correspond to the primitive object TypeInfo objectTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector( - ObjectInspectorFactory.getReflectionObjectInspector(clazzToCompareTo, ObjectInspectorFactory.ObjectInspectorOptions.JAVA)); + ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(), ObjectInspectorFactory.ObjectInspectorOptions.JAVA)); List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos(); int index = 0; @@ -106,7 +104,7 @@ public class NiFiOrcUtils { if (o instanceof Double) { return new DoubleWritable((double) o); } - if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) { + if (o instanceof String) { return new Text(o.toString()); } if (o instanceof ByteBuffer) { @@ -126,10 +124,19 @@ public class NiFiOrcUtils { } if (o instanceof Object[]) { Object[] objArray = (Object[]) o; - TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); - return Arrays.stream(objArray) - .map(o1 -> convertToORCObject(listTypeInfo, o1, hiveFieldNames)) - .collect(Collectors.toList()); + if(TypeInfoFactory.binaryTypeInfo.equals(typeInfo)) { + byte[] dest = new byte[objArray.length]; + for(int i=0;i<objArray.length;i++) { + dest[i] = (byte) objArray[i]; + } + return new BytesWritable(dest); + } else { + // If not binary, assume a list of objects + TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); + return Arrays.stream(objArray) + .map(o1 -> convertToORCObject(listTypeInfo, o1, hiveFieldNames)) + .collect(Collectors.toList()); + } } if (o instanceof int[]) { int[] intArray = (int[]) o; @@ -163,19 +170,30 @@ public class NiFiOrcUtils { .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 1, hiveFieldNames)) .collect(Collectors.toList()); } - if (o instanceof GenericData.Array) { - GenericData.Array array = ((GenericData.Array) o); - // The type information in this case is interpreted as a List - TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); - return array.stream().map((element) -> convertToORCObject(listTypeInfo, element, hiveFieldNames)).collect(Collectors.toList()); - } if (o instanceof List) { return o; } + if (o instanceof Record) { + Record record = (Record) o; + TypeInfo recordSchema = NiFiOrcUtils.getOrcSchema(record.getSchema(), hiveFieldNames); + List<RecordField> recordFields = record.getSchema().getFields(); + if (recordFields != null) { + Object[] fieldObjects = new Object[recordFields.size()]; + for (int i = 0; i < recordFields.size(); i++) { + RecordField field = recordFields.get(i); + DataType dataType = field.getDataType(); + Object fieldObject = record.getValue(field); + fieldObjects[i] = convertToORCObject(NiFiOrcUtils.getOrcField(dataType, hiveFieldNames), fieldObject, hiveFieldNames); + } + return NiFiOrcUtils.createOrcStruct(recordSchema, fieldObjects); + } + return null; + } if (o instanceof Map) { Map map = new HashMap(); - TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); - TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); + MapTypeInfo mapTypeInfo = ((MapTypeInfo) typeInfo); + TypeInfo keyInfo = mapTypeInfo.getMapKeyTypeInfo(); + TypeInfo valueInfo = mapTypeInfo.getMapValueTypeInfo(); // Unions are not allowed as key/value types, so if we convert the key and value objects, // they should return Writable objects ((Map) o).forEach((key, value) -> { @@ -188,21 +206,6 @@ public class NiFiOrcUtils { }); return map; } - if (o instanceof GenericData.Record) { - GenericData.Record record = (GenericData.Record) o; - TypeInfo recordSchema = NiFiOrcUtils.getOrcField(record.getSchema(), hiveFieldNames); - List<Schema.Field> recordFields = record.getSchema().getFields(); - if (recordFields != null) { - Object[] fieldObjects = new Object[recordFields.size()]; - for (int i = 0; i < recordFields.size(); i++) { - Schema.Field field = recordFields.get(i); - Schema fieldSchema = field.schema(); - Object fieldObject = record.get(field.name()); - fieldObjects[i] = NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldSchema, hiveFieldNames), fieldObject, hiveFieldNames); - } - return NiFiOrcUtils.createOrcStruct(recordSchema, fieldObjects); - } - } throw new IllegalArgumentException("Error converting object of type " + o.getClass().getName() + " to ORC type " + typeInfo.getTypeName()); } else { return null; @@ -234,247 +237,228 @@ public class NiFiOrcUtils { return name.replaceAll("[\\. ]", "_"); } - public static String generateHiveDDL(Schema avroSchema, String tableName, boolean hiveFieldNames) { - Schema.Type schemaType = avroSchema.getType(); - StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS "); + public static String generateHiveDDL(RecordSchema recordSchema, String tableName, boolean hiveFieldNames) { + StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS `"); sb.append(tableName); - sb.append(" ("); - if (Schema.Type.RECORD.equals(schemaType)) { - List<String> hiveColumns = new ArrayList<>(); - List<Schema.Field> fields = avroSchema.getFields(); - if (fields != null) { - hiveColumns.addAll( - fields.stream().map(field -> (hiveFieldNames ? field.name().toLowerCase() : field.name()) + " " - + getHiveTypeFromAvroType(field.schema(), hiveFieldNames)).collect(Collectors.toList())); - } - sb.append(StringUtils.join(hiveColumns, ", ")); - sb.append(") STORED AS ORC"); - return sb.toString(); - } else { - throw new IllegalArgumentException("Avro schema is of type " + schemaType.getName() + ", not RECORD"); + sb.append("` ("); + List<String> hiveColumns = new ArrayList<>(); + List<RecordField> fields = recordSchema.getFields(); + if (fields != null) { + hiveColumns.addAll( + fields.stream().map(field -> "`" + (hiveFieldNames ? field.getFieldName().toLowerCase() : field.getFieldName()) + "` " + + getHiveTypeFromFieldType(field.getDataType(), hiveFieldNames)).collect(Collectors.toList())); } - } + sb.append(StringUtils.join(hiveColumns, ", ")); + sb.append(") STORED AS ORC"); + return sb.toString(); + } - public static TypeInfo getOrcField(Schema fieldSchema, boolean hiveFieldNames) throws IllegalArgumentException { - Schema.Type fieldType = fieldSchema.getType(); - LogicalType logicalType = fieldSchema.getLogicalType(); - - switch (fieldType) { - case INT: - case LONG: - // Handle logical types - if (logicalType != null) { - if (LogicalTypes.date().equals(logicalType)) { - return TypeInfoFactory.dateTypeInfo; - } else if (LogicalTypes.timeMicros().equals(logicalType)) { - // Time micros isn't supported by our Record Field types (see AvroTypeUtil) - throw new IllegalArgumentException("time-micros is not a supported field type"); - } else if (LogicalTypes.timeMillis().equals(logicalType)) { - return TypeInfoFactory.intTypeInfo; - } else if (LogicalTypes.timestampMicros().equals(logicalType)) { - // Timestamp micros isn't supported by our Record Field types (see AvroTypeUtil) - throw new IllegalArgumentException("timestamp-micros is not a supported field type"); - } else if (LogicalTypes.timestampMillis().equals(logicalType)) { - return TypeInfoFactory.timestampTypeInfo; - } - } - return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType); - case BYTES: - // Handle logical types - if (logicalType != null) { - if (logicalType instanceof LogicalTypes.Decimal) { - return TypeInfoFactory.doubleTypeInfo; - } - } - return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType); - - case BOOLEAN: - case DOUBLE: - case FLOAT: - case STRING: - return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType); - - case UNION: - List<Schema> unionFieldSchemas = fieldSchema.getTypes(); - - if (unionFieldSchemas != null) { - // Ignore null types in union - List<TypeInfo> orcFields = unionFieldSchemas.stream().filter( - unionFieldSchema -> !Schema.Type.NULL.equals(unionFieldSchema.getType())) - .map((it) -> NiFiOrcUtils.getOrcField(it, hiveFieldNames)) - .collect(Collectors.toList()); - - // Flatten the field if the union only has one non-null element - if (orcFields.size() == 1) { - return orcFields.get(0); - } else { - return TypeInfoFactory.getUnionTypeInfo(orcFields); - } - } - return null; - - case ARRAY: - return TypeInfoFactory.getListTypeInfo(getOrcField(fieldSchema.getElementType(), hiveFieldNames)); - - case MAP: - return TypeInfoFactory.getMapTypeInfo( - getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING), - getOrcField(fieldSchema.getValueType(), hiveFieldNames)); - - case RECORD: - List<Schema.Field> avroFields = fieldSchema.getFields(); - if (avroFields != null) { - List<String> orcFieldNames = new ArrayList<>(avroFields.size()); - List<TypeInfo> orcFields = new ArrayList<>(avroFields.size()); - avroFields.forEach(avroField -> { - String fieldName = hiveFieldNames ? avroField.name().toLowerCase() : avroField.name(); - orcFieldNames.add(fieldName); - orcFields.add(getOrcField(avroField.schema(), hiveFieldNames)); - }); - return TypeInfoFactory.getStructTypeInfo(orcFieldNames, orcFields); - } - return null; + public static TypeInfo getOrcSchema(RecordSchema recordSchema, boolean hiveFieldNames) throws IllegalArgumentException { + List<RecordField> recordFields = recordSchema.getFields(); + if (recordFields != null) { + List<String> orcFieldNames = new ArrayList<>(recordFields.size()); + List<TypeInfo> orcFields = new ArrayList<>(recordFields.size()); + recordFields.forEach(recordField -> { + String fieldName = hiveFieldNames ? recordField.getFieldName().toLowerCase() : recordField.getFieldName(); + orcFieldNames.add(fieldName); + orcFields.add(getOrcField(recordField.getDataType(), hiveFieldNames)); + }); + return TypeInfoFactory.getStructTypeInfo(orcFieldNames, orcFields); + } + return null; + } - case ENUM: - // An enum value is just a String for ORC/Hive - return getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING); - default: - throw new IllegalArgumentException("Did not recognize Avro type " + fieldType.getName()); + public static TypeInfo getOrcField(DataType dataType, boolean hiveFieldNames) throws IllegalArgumentException { + if (dataType == null) { + return null; } - } + RecordFieldType fieldType = dataType.getFieldType(); + if (RecordFieldType.INT.equals(fieldType) + || RecordFieldType.LONG.equals(fieldType) + || RecordFieldType.BOOLEAN.equals(fieldType) + || RecordFieldType.DOUBLE.equals(fieldType) + || RecordFieldType.FLOAT.equals(fieldType) + || RecordFieldType.STRING.equals(fieldType)) { + return getPrimitiveOrcTypeFromPrimitiveFieldType(dataType); + } + if (RecordFieldType.DATE.equals(fieldType)) { + return TypeInfoFactory.dateTypeInfo; + } + if (RecordFieldType.TIME.equals(fieldType)) { + return TypeInfoFactory.intTypeInfo; + } + if (RecordFieldType.TIMESTAMP.equals(fieldType)) { + return TypeInfoFactory.timestampTypeInfo; + } + if (RecordFieldType.ARRAY.equals(fieldType)) { + ArrayDataType arrayDataType = (ArrayDataType) dataType; + if (RecordFieldType.BYTE.getDataType().equals(arrayDataType.getElementType())) { + return TypeInfoFactory.getPrimitiveTypeInfo("binary"); + } + return TypeInfoFactory.getListTypeInfo(getOrcField(arrayDataType.getElementType(), hiveFieldNames)); + } + if (RecordFieldType.CHOICE.equals(fieldType)) { + ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + List<DataType> unionFieldSchemas = choiceDataType.getPossibleSubTypes(); + + if (unionFieldSchemas != null) { + // Ignore null types in union + List<TypeInfo> orcFields = unionFieldSchemas.stream() + .map((it) -> NiFiOrcUtils.getOrcField(it, hiveFieldNames)) + .collect(Collectors.toList()); - public static Schema.Type getAvroSchemaTypeOfObject(Object o) { - if (o == null) { - return Schema.Type.NULL; - } else if (o instanceof Integer) { - return Schema.Type.INT; - } else if (o instanceof Long) { - return Schema.Type.LONG; - } else if (o instanceof Boolean) { - return Schema.Type.BOOLEAN; - } else if (o instanceof byte[]) { - return Schema.Type.BYTES; - } else if (o instanceof Float) { - return Schema.Type.FLOAT; - } else if (o instanceof Double) { - return Schema.Type.DOUBLE; - } else if (o instanceof Enum) { - return Schema.Type.ENUM; - } else if (o instanceof Object[]) { - return Schema.Type.ARRAY; - } else if (o instanceof List) { - return Schema.Type.ARRAY; - } else if (o instanceof Map) { - return Schema.Type.MAP; - } else { - throw new IllegalArgumentException("Object of class " + o.getClass() + " is not a supported Avro Type"); + // Flatten the field if the union only has one non-null element + if (orcFields.size() == 1) { + return orcFields.get(0); + } else { + return TypeInfoFactory.getUnionTypeInfo(orcFields); + } + } + return null; + } + if (RecordFieldType.MAP.equals(fieldType)) { + MapDataType mapDataType = (MapDataType) dataType; + return TypeInfoFactory.getMapTypeInfo( + getPrimitiveOrcTypeFromPrimitiveFieldType(RecordFieldType.STRING.getDataType()), + getOrcField(mapDataType.getValueType(), hiveFieldNames)); } + if (RecordFieldType.RECORD.equals(fieldType)) { + RecordDataType recordDataType = (RecordDataType) dataType; + List<RecordField> recordFields = recordDataType.getChildSchema().getFields(); + if (recordFields != null) { + List<String> orcFieldNames = new ArrayList<>(recordFields.size()); + List<TypeInfo> orcFields = new ArrayList<>(recordFields.size()); + recordFields.forEach(recordField -> { + String fieldName = hiveFieldNames ? recordField.getFieldName().toLowerCase() : recordField.getFieldName(); + orcFieldNames.add(fieldName); + orcFields.add(getOrcField(recordField.getDataType(), hiveFieldNames)); + }); + return TypeInfoFactory.getStructTypeInfo(orcFieldNames, orcFields); + } + return null; + } + + throw new IllegalArgumentException("Did not recognize field type " + fieldType.name()); } - public static TypeInfo getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type avroType) throws IllegalArgumentException { - if (avroType == null) { + public static TypeInfo getPrimitiveOrcTypeFromPrimitiveFieldType(DataType rawDataType) throws IllegalArgumentException { + if (rawDataType == null) { throw new IllegalArgumentException("Avro type is null"); } - switch (avroType) { - case INT: - return TypeInfoFactory.getPrimitiveTypeInfo("int"); - case LONG: - return TypeInfoFactory.getPrimitiveTypeInfo("bigint"); - case BOOLEAN: - return TypeInfoFactory.getPrimitiveTypeInfo("boolean"); - case BYTES: - return TypeInfoFactory.getPrimitiveTypeInfo("binary"); - case DOUBLE: - return TypeInfoFactory.getPrimitiveTypeInfo("double"); - case FLOAT: - return TypeInfoFactory.getPrimitiveTypeInfo("float"); - case STRING: - return TypeInfoFactory.getPrimitiveTypeInfo("string"); - default: - throw new IllegalArgumentException("Avro type " + avroType.getName() + " is not a primitive type"); + RecordFieldType fieldType = rawDataType.getFieldType(); + if (RecordFieldType.INT.equals(fieldType)) { + return TypeInfoFactory.getPrimitiveTypeInfo("int"); + } + if (RecordFieldType.LONG.equals(fieldType)) { + return TypeInfoFactory.getPrimitiveTypeInfo("bigint"); } + if (RecordFieldType.BOOLEAN.equals(fieldType)) { + return TypeInfoFactory.getPrimitiveTypeInfo("boolean"); + } + if (RecordFieldType.DOUBLE.equals(fieldType)) { + return TypeInfoFactory.getPrimitiveTypeInfo("double"); + } + if (RecordFieldType.FLOAT.equals(fieldType)) { + return TypeInfoFactory.getPrimitiveTypeInfo("float"); + } + if (RecordFieldType.STRING.equals(fieldType)) { + return TypeInfoFactory.getPrimitiveTypeInfo("string"); + } + + throw new IllegalArgumentException("Field type " + fieldType.name() + " is not a primitive type"); } - public static String getHiveTypeFromAvroType(Schema avroSchema, boolean hiveFieldNames) { - if (avroSchema == null) { - throw new IllegalArgumentException("Avro schema is null"); + public static String getHiveSchema(RecordSchema recordSchema, boolean hiveFieldNames) throws IllegalArgumentException { + List<RecordField> recordFields = recordSchema.getFields(); + if (recordFields != null) { + List<String> hiveFields = new ArrayList<>(recordFields.size()); + recordFields.forEach(recordField -> { + hiveFields.add((hiveFieldNames ? recordField.getFieldName().toLowerCase() : recordField.getFieldName()) + + ":" + getHiveTypeFromFieldType(recordField.getDataType(), hiveFieldNames)); + }); + return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">"; } + return null; + } - Schema.Type avroType = avroSchema.getType(); - LogicalType logicalType = avroSchema.getLogicalType(); + public static String getHiveTypeFromFieldType(DataType rawDataType, boolean hiveFieldNames) { + if (rawDataType == null) { + throw new IllegalArgumentException("Field type is null"); + } + RecordFieldType dataType = rawDataType.getFieldType(); - switch (avroType) { - case INT: - if (logicalType != null) { - if (LogicalTypes.date().equals(logicalType)) { - return "DATE"; - } - // Time-millis has no current corresponding Hive type, perhaps an INTERVAL type when that is fully supported. - } - return "INT"; - case LONG: - if (logicalType != null) { - if (LogicalTypes.timestampMillis().equals(logicalType)) { - return "TIMESTAMP"; - } - // Timestamp-micros and time-micros are not supported by our Record Field type system - } - return "BIGINT"; - case BOOLEAN: - return "BOOLEAN"; - case BYTES: - if (logicalType != null) { - if (logicalType instanceof LogicalTypes.Decimal) { - return "DOUBLE"; - } - } + if (RecordFieldType.INT.equals(dataType)) { + return "INT"; + } + if (RecordFieldType.LONG.equals(dataType)) { + return "BIGINT"; + } + if (RecordFieldType.BOOLEAN.equals(dataType)) { + return "BOOLEAN"; + } + if (RecordFieldType.DOUBLE.equals(dataType)) { + return "DOUBLE"; + } + if (RecordFieldType.FLOAT.equals(dataType)) { + return "FLOAT"; + } + if (RecordFieldType.STRING.equals(dataType)) { + return "STRING"; + } + if (RecordFieldType.DATE.equals(dataType)) { + return "DATE"; + } + if (RecordFieldType.TIME.equals(dataType)) { + return "INT"; + } + if (RecordFieldType.TIMESTAMP.equals(dataType)) { + return "TIMESTAMP"; + } + if (RecordFieldType.ARRAY.equals(dataType)) { + ArrayDataType arrayDataType = (ArrayDataType) rawDataType; + if (RecordFieldType.BYTE.getDataType().equals(arrayDataType.getElementType())) { return "BINARY"; - case DOUBLE: - return "DOUBLE"; - case FLOAT: - return "FLOAT"; - case STRING: - case ENUM: - return "STRING"; - case UNION: - List<Schema> unionFieldSchemas = avroSchema.getTypes(); - if (unionFieldSchemas != null) { - List<String> hiveFields = new ArrayList<>(); - for (Schema unionFieldSchema : unionFieldSchemas) { - Schema.Type unionFieldSchemaType = unionFieldSchema.getType(); - // Ignore null types in union - if (!Schema.Type.NULL.equals(unionFieldSchemaType)) { - hiveFields.add(getHiveTypeFromAvroType(unionFieldSchema, hiveFieldNames)); - } - } - // Flatten the field if the union only has one non-null element - return (hiveFields.size() == 1) - ? hiveFields.get(0) - : "UNIONTYPE<" + StringUtils.join(hiveFields, ", ") + ">"; + } + return "ARRAY<" + getHiveTypeFromFieldType(arrayDataType.getElementType(), hiveFieldNames) + ">"; + } + if (RecordFieldType.MAP.equals(dataType)) { + MapDataType mapDataType = (MapDataType) rawDataType; + return "MAP<STRING, " + getHiveTypeFromFieldType(mapDataType.getValueType(), hiveFieldNames) + ">"; + } + if (RecordFieldType.CHOICE.equals(dataType)) { + ChoiceDataType choiceDataType = (ChoiceDataType) rawDataType; + List<DataType> unionFieldSchemas = choiceDataType.getPossibleSubTypes(); + + if (unionFieldSchemas != null) { + // Ignore null types in union + List<String> hiveFields = unionFieldSchemas.stream() + .map((it) -> getHiveTypeFromFieldType(it, hiveFieldNames)) + .collect(Collectors.toList()); - } - break; - case MAP: - return "MAP<STRING, " + getHiveTypeFromAvroType(avroSchema.getValueType(), hiveFieldNames) + ">"; - case ARRAY: - return "ARRAY<" + getHiveTypeFromAvroType(avroSchema.getElementType(), hiveFieldNames) + ">"; - case RECORD: - List<Schema.Field> recordFields = avroSchema.getFields(); - if (recordFields != null) { - List<String> hiveFields = recordFields.stream().map( - recordField -> (hiveFieldNames ? recordField.name().toLowerCase() : recordField.name()) + ":" - + getHiveTypeFromAvroType(recordField.schema(), hiveFieldNames)).collect(Collectors.toList()); - return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">"; - } - break; - default: - break; + // Flatten the field if the union only has one non-null element + return (hiveFields.size() == 1) + ? hiveFields.get(0) + : "UNIONTYPE<" + StringUtils.join(hiveFields, ", ") + ">"; + } + return null; + } + + if (RecordFieldType.RECORD.equals(dataType)) { + RecordDataType recordDataType = (RecordDataType) rawDataType; + List<RecordField> recordFields = recordDataType.getChildSchema().getFields(); + if (recordFields != null) { + List<String> hiveFields = recordFields.stream().map( + recordField -> ("`" + (hiveFieldNames ? recordField.getFieldName().toLowerCase() : recordField.getFieldName()) + "`:" + + getHiveTypeFromFieldType(recordField.getDataType(), hiveFieldNames))).collect(Collectors.toList()); + return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">"; + } + return null; } - throw new IllegalArgumentException("Error converting Avro type " + avroType.getName() + " to Hive type"); + throw new IllegalArgumentException("Error converting Avro type " + dataType.name() + " to Hive type"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java index a0a5d13..9af566a 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.orc; -import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.CompressionKind; @@ -30,7 +29,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -157,19 +155,17 @@ public class PutORC extends AbstractPutHDFSRecord { public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path, final RecordSchema schema) throws IOException, SchemaNotFoundException { - final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema); - final long stripeSize = context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue(); final int bufferSize = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final CompressionKind compressionType = CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue()); final boolean normalizeForHive = context.getProperty(HIVE_FIELD_NAMES).asBoolean(); - TypeInfo orcSchema = NiFiOrcUtils.getOrcField(avroSchema, normalizeForHive); + TypeInfo orcSchema = NiFiOrcUtils.getOrcSchema(schema, normalizeForHive); final Writer orcWriter = NiFiOrcUtils.createWriter(path, conf, orcSchema, stripeSize, compressionType, bufferSize); final String hiveTableName = context.getProperty(HIVE_TABLE_NAME).isSet() ? context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue() - : NiFiOrcUtils.normalizeHiveTableName(avroSchema.getFullName()); + : NiFiOrcUtils.normalizeHiveTableName(schema.getIdentifier().getName().orElse("unknown")); final boolean hiveFieldNames = context.getProperty(HIVE_FIELD_NAMES).asBoolean(); - return new ORCHDFSRecordWriter(orcWriter, avroSchema, hiveTableName, hiveFieldNames); + return new ORCHDFSRecordWriter(orcWriter, schema, hiveTableName, hiveFieldNames); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java index bd386a0..3adfb80 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/record/ORCHDFSRecordWriter.java @@ -16,13 +16,15 @@ */ package org.apache.nifi.processors.orc.record; -import org.apache.avro.Schema; import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import java.io.IOException; @@ -33,27 +35,27 @@ import java.util.Map; import static org.apache.nifi.processors.orc.PutORC.HIVE_DDL_ATTRIBUTE; /** - * HDFSRecordWriter that writes ORC files using Avro as the schema representation. + * HDFSRecordWriter that writes ORC files using the NiFi Record API as the schema representation. */ public class ORCHDFSRecordWriter implements HDFSRecordWriter { - private final Schema avroSchema; + private final RecordSchema recordSchema; private final TypeInfo orcSchema; private final Writer orcWriter; private final String hiveTableName; private final boolean hiveFieldNames; - private final List<Schema.Field> recordFields; + private final List<RecordField> recordFields; private final int numRecordFields; private Object[] workingRow; - public ORCHDFSRecordWriter(final Writer orcWriter, final Schema avroSchema, final String hiveTableName, final boolean hiveFieldNames) { - this.avroSchema = avroSchema; + public ORCHDFSRecordWriter(final Writer orcWriter, final RecordSchema recordSchema, final String hiveTableName, final boolean hiveFieldNames) { + this.recordSchema = recordSchema; this.orcWriter = orcWriter; this.hiveFieldNames = hiveFieldNames; - this.orcSchema = NiFiOrcUtils.getOrcField(avroSchema, this.hiveFieldNames); + this.orcSchema = NiFiOrcUtils.getOrcSchema(recordSchema, this.hiveFieldNames); this.hiveTableName = hiveTableName; - this.recordFields = avroSchema != null ? avroSchema.getFields() : null; + this.recordFields = recordSchema != null ? recordSchema.getFields() : null; this.numRecordFields = recordFields != null ? recordFields.size() : -1; // Reuse row object this.workingRow = numRecordFields > -1 ? new Object[numRecordFields] : null; @@ -63,17 +65,18 @@ public class ORCHDFSRecordWriter implements HDFSRecordWriter { public void write(final Record record) throws IOException { if (recordFields != null) { for (int i = 0; i < numRecordFields; i++) { - final Schema.Field field = recordFields.get(i); - final Schema fieldSchema = field.schema(); - final String fieldName = field.name(); - Object o = record.getValue(fieldName); + final RecordField field = recordFields.get(i); + final DataType fieldType = field.getDataType(); + final String fieldName = field.getFieldName(); + Object o = record.getValue(field); try { - workingRow[i] = NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldSchema, hiveFieldNames), o, hiveFieldNames); + workingRow[i] = NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldType, hiveFieldNames), o, hiveFieldNames); } catch (ArrayIndexOutOfBoundsException aioobe) { final String errorMsg = "Index out of bounds for column " + i + ", type " + fieldName + ", and object " + o.toString(); throw new IOException(errorMsg, aioobe); } } + orcWriter.addRow(NiFiOrcUtils.createOrcStruct(orcSchema, workingRow)); } } @@ -93,7 +96,7 @@ public class ORCHDFSRecordWriter implements HDFSRecordWriter { } // Add Hive DDL Attribute - String hiveDDL = NiFiOrcUtils.generateHiveDDL(avroSchema, hiveTableName, hiveFieldNames); + String hiveDDL = NiFiOrcUtils.generateHiveDDL(recordSchema, hiveTableName, hiveFieldNames); Map<String, String> attributes = new HashMap<String, String>() {{ put(HIVE_DDL_ATTRIBUTE, hiveDDL); }}; http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java index 3cc6fba..cbb1cf7 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java @@ -46,6 +46,7 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; @@ -162,7 +163,7 @@ public class PutORCTest { mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename); mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "100"); mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE, - "CREATE EXTERNAL TABLE IF NOT EXISTS myTable (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS ORC"); + "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS ORC"); // verify we generated a provenance event final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents(); @@ -233,7 +234,7 @@ public class PutORCTest { mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "10"); // DDL will be created with field names normalized (lowercased, e.g.) for Hive by default mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE, - "CREATE EXTERNAL TABLE IF NOT EXISTS myTable (id INT, timemillis INT, timestampmillis TIMESTAMP, dt DATE, dec DOUBLE) STORED AS ORC"); + "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DOUBLE) STORED AS ORC"); // verify we generated a provenance event final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents(); @@ -384,6 +385,41 @@ public class PutORCTest { Assert.assertFalse(tempAvroORCFile.exists()); } + @Test + public void testNestedRecords() throws Exception { + testRunner = TestRunners.newTestRunner(proc); + testRunner.setProperty(PutORC.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + testRunner.setProperty(PutORC.DIRECTORY, DIRECTORY); + + MockRecordParser readerFactory = new MockRecordParser(); + + final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/nested_record.avsc"), StandardCharsets.UTF_8); + schema = new Schema.Parser().parse(avroSchema); + + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField); + } + + Map<String,Object> nestedRecordMap = new HashMap<>(); + nestedRecordMap.put("id", 11088000000001615L); + nestedRecordMap.put("x", "Hello World!"); + + RecordSchema nestedRecordSchema = AvroTypeUtil.createSchema(schema.getField("myField").schema()); + MapRecord nestedRecord = new MapRecord(nestedRecordSchema, nestedRecordMap); + // This gets added in to its spot in the schema, which is already named "myField" + readerFactory.addRecord(nestedRecord); + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + + testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory"); + + testRunner.enqueue("trigger"); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1); + } + private void verifyORCUsers(final Path orcUsers, final int numExpectedUsers) throws IOException { verifyORCUsers(orcUsers, numExpectedUsers, null); } http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java index 4682d76..bde6af8 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java @@ -20,7 +20,6 @@ package org.apache.nifi.util.orc; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; -import org.apache.avro.util.Utf8; import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils; import org.apache.hadoop.hive.serde2.objectinspector.UnionObject; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -31,6 +30,12 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Test; import java.nio.ByteBuffer; @@ -61,20 +66,19 @@ public class TestNiFiOrcUtils { }; // Build a fake Avro record with all types - Schema testSchema = buildPrimitiveAvroSchema(); - List<Schema.Field> fields = testSchema.getFields(); + RecordSchema testSchema = buildPrimitiveRecordSchema(); + List<RecordField> fields = testSchema.getFields(); for (int i = 0; i < fields.size(); i++) { - assertEquals(expectedTypes[i], NiFiOrcUtils.getOrcField(fields.get(i).schema(), false)); + assertEquals(expectedTypes[i], NiFiOrcUtils.getOrcField(fields.get(i).getDataType(), false)); } - } @Test public void test_getOrcField_union_optional_type() { final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields(); builder.name("union").type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault(); - Schema testSchema = builder.endRecord(); - TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("union").schema(), false); + RecordSchema testSchema = AvroTypeUtil.createSchema(builder.endRecord()); + TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("union").get().getDataType(), false); assertEquals(TypeInfoCreator.createBoolean(), orcType); } @@ -82,8 +86,8 @@ public class TestNiFiOrcUtils { public void test_getOrcField_union() { final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields(); builder.name("union").type().unionOf().intType().and().booleanType().endUnion().noDefault(); - Schema testSchema = builder.endRecord(); - TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("union").schema(), false); + RecordSchema testSchema = AvroTypeUtil.createSchema(builder.endRecord()); + TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("union").get().getDataType(), false); assertEquals( TypeInfoFactory.getUnionTypeInfo(Arrays.asList( TypeInfoCreator.createInt(), @@ -95,8 +99,8 @@ public class TestNiFiOrcUtils { public void test_getOrcField_map() { final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields(); builder.name("map").type().map().values().doubleType().noDefault(); - Schema testSchema = builder.endRecord(); - TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("map").schema(), true); + RecordSchema testSchema = AvroTypeUtil.createSchema(builder.endRecord()); + TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("map").get().getDataType(), true); assertEquals( TypeInfoFactory.getMapTypeInfo( TypeInfoCreator.createString(), @@ -108,8 +112,8 @@ public class TestNiFiOrcUtils { public void test_getOrcField_nested_map() { final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields(); builder.name("map").type().map().values().map().values().doubleType().noDefault(); - Schema testSchema = builder.endRecord(); - TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("map").schema(), false); + RecordSchema testSchema = AvroTypeUtil.createSchema(builder.endRecord()); + TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("map").get().getDataType(), false); assertEquals( TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(), TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(), TypeInfoCreator.createDouble())), @@ -120,8 +124,8 @@ public class TestNiFiOrcUtils { public void test_getOrcField_array() { final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields(); builder.name("array").type().array().items().longType().noDefault(); - Schema testSchema = builder.endRecord(); - TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("array").schema(), false); + RecordSchema testSchema = AvroTypeUtil.createSchema(builder.endRecord()); + TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("array").get().getDataType(), false); assertEquals( TypeInfoFactory.getListTypeInfo(TypeInfoCreator.createLong()), orcType); @@ -131,8 +135,8 @@ public class TestNiFiOrcUtils { public void test_getOrcField_complex_array() { final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields(); builder.name("Array").type().array().items().map().values().floatType().noDefault(); - Schema testSchema = builder.endRecord(); - TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("Array").schema(), true); + RecordSchema testSchema = AvroTypeUtil.createSchema(builder.endRecord()); + TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("Array").get().getDataType(), true); assertEquals( TypeInfoFactory.getListTypeInfo(TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(), TypeInfoCreator.createFloat())), orcType); @@ -144,9 +148,9 @@ public class TestNiFiOrcUtils { builder.name("Int").type().intType().noDefault(); builder.name("Long").type().longType().longDefault(1L); builder.name("Array").type().array().items().stringType().noDefault(); - Schema testSchema = builder.endRecord(); + RecordSchema testSchema = AvroTypeUtil.createSchema(builder.endRecord()); // Normalize field names for Hive, assert that their names are now lowercase - TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema, true); + TypeInfo orcType = NiFiOrcUtils.getOrcSchema(testSchema, true); assertEquals( TypeInfoFactory.getStructTypeInfo( Arrays.asList("int", "long", "array"), @@ -161,13 +165,13 @@ public class TestNiFiOrcUtils { public void test_getOrcField_enum() { final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields(); builder.name("enumField").type().enumeration("enum").symbols("a", "b", "c").enumDefault("a"); - Schema testSchema = builder.endRecord(); - TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("enumField").schema(), true); + RecordSchema testSchema = AvroTypeUtil.createSchema(builder.endRecord()); + TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("enumField").get().getDataType(), true); assertEquals(TypeInfoCreator.createString(), orcType); } @Test - public void test_getPrimitiveOrcTypeFromPrimitiveAvroType() { + public void test_getPrimitiveOrcTypeFromPrimitiveFieldType() { // Expected ORC types TypeInfo[] expectedTypes = { TypeInfoCreator.createInt(), @@ -179,17 +183,20 @@ public class TestNiFiOrcUtils { TypeInfoCreator.createString(), }; - Schema testSchema = buildPrimitiveAvroSchema(); - List<Schema.Field> fields = testSchema.getFields(); + RecordSchema testSchema = buildPrimitiveRecordSchema(); + List<RecordField> fields = testSchema.getFields(); for (int i = 0; i < fields.size(); i++) { - assertEquals(expectedTypes[i], NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(fields.get(i).schema().getType())); + // Skip Binary as it is a primitive type in Avro but a complex type (array[byte]) in the NiFi Record API + if (i == 5) { + continue; + } + assertEquals(expectedTypes[i], NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveFieldType(fields.get(i).getDataType())); } } @Test(expected = IllegalArgumentException.class) - public void test_getPrimitiveOrcTypeFromPrimitiveAvroType_badType() { - Schema.Type nonPrimitiveType = Schema.Type.ARRAY; - NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(nonPrimitiveType); + public void test_getPrimitiveOrcTypeFromPrimitiveFieldType_badType() { + NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveFieldType(RecordFieldType.ARRAY.getDataType()); } @Test @@ -213,7 +220,7 @@ public class TestNiFiOrcUtils { } @Test - public void test_getHiveTypeFromAvroType_primitive() { + public void test_getHiveTypeFromFieldType_primitive() { // Expected ORC types String[] expectedTypes = { "INT", @@ -225,15 +232,15 @@ public class TestNiFiOrcUtils { "STRING", }; - Schema testSchema = buildPrimitiveAvroSchema(); - List<Schema.Field> fields = testSchema.getFields(); + RecordSchema testSchema = buildPrimitiveRecordSchema(); + List<RecordField> fields = testSchema.getFields(); for (int i = 0; i < fields.size(); i++) { - assertEquals(expectedTypes[i], NiFiOrcUtils.getHiveTypeFromAvroType(fields.get(i).schema(), false)); + assertEquals(expectedTypes[i], NiFiOrcUtils.getHiveTypeFromFieldType(fields.get(i).getDataType(), false)); } } @Test - public void test_getHiveTypeFromAvroType_complex() { + public void test_getHiveTypeFromFieldType_complex() { // Expected ORC types String[] expectedTypes = { "INT", @@ -243,46 +250,45 @@ public class TestNiFiOrcUtils { "ARRAY<INT>" }; - Schema testSchema = buildComplexAvroSchema(); - List<Schema.Field> fields = testSchema.getFields(); + RecordSchema testSchema = buildComplexRecordSchema(); + List<RecordField> fields = testSchema.getFields(); for (int i = 0; i < fields.size(); i++) { - assertEquals(expectedTypes[i], NiFiOrcUtils.getHiveTypeFromAvroType(fields.get(i).schema(), false)); + assertEquals(expectedTypes[i], NiFiOrcUtils.getHiveTypeFromFieldType(fields.get(i).getDataType(), false)); } assertEquals("STRUCT<myInt:INT, myMap:MAP<STRING, DOUBLE>, myEnum:STRING, myLongOrFloat:UNIONTYPE<BIGINT, FLOAT>, myIntList:ARRAY<INT>>", - NiFiOrcUtils.getHiveTypeFromAvroType(testSchema, false)); + NiFiOrcUtils.getHiveSchema(testSchema, false)); } @Test public void test_generateHiveDDL_primitive() { - Schema avroSchema = buildPrimitiveAvroSchema(); - String ddl = NiFiOrcUtils.generateHiveDDL(avroSchema, "myHiveTable", false); - assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable (int INT, long BIGINT, boolean BOOLEAN, float FLOAT, double DOUBLE, bytes BINARY, string STRING)" + RecordSchema schema = buildPrimitiveRecordSchema(); + String ddl = NiFiOrcUtils.generateHiveDDL(schema, "myHiveTable", false); + assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `myHiveTable` (`int` INT, `long` BIGINT, `boolean` BOOLEAN, `float` FLOAT, `double` DOUBLE, `bytes` BINARY, `string` STRING)" + " STORED AS ORC", ddl); } @Test public void test_generateHiveDDL_complex() { - Schema avroSchema = buildComplexAvroSchema(); - String ddl = NiFiOrcUtils.generateHiveDDL(avroSchema, "myHiveTable", false); - assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable " - + "(myInt INT, myMap MAP<STRING, DOUBLE>, myEnum STRING, myLongOrFloat UNIONTYPE<BIGINT, FLOAT>, myIntList ARRAY<INT>)" + RecordSchema schema = buildComplexRecordSchema(); + String ddl = NiFiOrcUtils.generateHiveDDL(schema, "myHiveTable", false); + assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `myHiveTable` " + + "(`myInt` INT, `myMap` MAP<STRING, DOUBLE>, `myEnum` STRING, `myLongOrFloat` UNIONTYPE<BIGINT, FLOAT>, `myIntList` ARRAY<INT>)" + " STORED AS ORC", ddl); } @Test public void test_generateHiveDDL_complex_normalize() { - Schema avroSchema = buildComplexAvroSchema(); - String ddl = NiFiOrcUtils.generateHiveDDL(avroSchema, "myHiveTable", true); - assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable " - + "(myint INT, mymap MAP<STRING, DOUBLE>, myenum STRING, mylongorfloat UNIONTYPE<BIGINT, FLOAT>, myintlist ARRAY<INT>)" + RecordSchema schema = buildComplexRecordSchema(); + String ddl = NiFiOrcUtils.generateHiveDDL(schema, "myHiveTable", true); + assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `myHiveTable` " + + "(`myint` INT, `mymap` MAP<STRING, DOUBLE>, `myenum` STRING, `mylongorfloat` UNIONTYPE<BIGINT, FLOAT>, `myintlist` ARRAY<INT>)" + " STORED AS ORC", ddl); } @Test public void test_convertToORCObject() { - Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x", "y", "z"); - List<Object> objects = Arrays.asList(new Utf8("Hello"), new GenericData.EnumSymbol(schema, "x")); + List<Object> objects = Arrays.asList("Hello", "x"); objects.forEach((avroObject) -> { Object o = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("uniontype<bigint,string>"), avroObject, true); assertTrue(o instanceof UnionObject); @@ -297,7 +303,7 @@ public class TestNiFiOrcUtils { } @Test - public void test_getHiveTypeFromAvroType_complex_normalize() { + public void test_getHiveTypeFromFieldType_complex_normalize() { // Expected ORC types String[] expectedTypes = { "INT", @@ -307,22 +313,22 @@ public class TestNiFiOrcUtils { "ARRAY<INT>" }; - Schema testSchema = buildComplexAvroSchema(); - List<Schema.Field> fields = testSchema.getFields(); + RecordSchema testSchema = buildComplexRecordSchema(); + List<RecordField> fields = testSchema.getFields(); for (int i = 0; i < fields.size(); i++) { - assertEquals(expectedTypes[i], NiFiOrcUtils.getHiveTypeFromAvroType(fields.get(i).schema(), true)); + assertEquals(expectedTypes[i], NiFiOrcUtils.getHiveTypeFromFieldType(fields.get(i).getDataType(), true)); } assertEquals("STRUCT<myint:INT, mymap:MAP<STRING, DOUBLE>, myenum:STRING, mylongorfloat:UNIONTYPE<BIGINT, FLOAT>, myintlist:ARRAY<INT>>", - NiFiOrcUtils.getHiveTypeFromAvroType(testSchema, true)); + NiFiOrcUtils.getHiveSchema(testSchema, true)); } ////////////////// // Helper methods ////////////////// - public static Schema buildPrimitiveAvroSchema() { - // Build a fake Avro record with all primitive types + public static RecordSchema buildPrimitiveRecordSchema() { + // Build a fake record with all primitive types final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields(); builder.name("int").type().intType().noDefault(); builder.name("long").type().longType().longDefault(1L); @@ -331,12 +337,12 @@ public class TestNiFiOrcUtils { builder.name("double").type().doubleType().doubleDefault(0.0); builder.name("bytes").type().bytesType().noDefault(); builder.name("string").type().stringType().stringDefault("default"); - return builder.endRecord(); + return AvroTypeUtil.createSchema(builder.endRecord()); } - public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) { - Schema schema = buildPrimitiveAvroSchema(); - GenericData.Record row = new GenericData.Record(schema); + public static Record buildPrimitiveRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) { + RecordSchema schema = buildPrimitiveRecordSchema(); + Map<String, Object> row = new HashMap<>(); row.put("int", i); row.put("long", l); row.put("boolean", b); @@ -344,7 +350,7 @@ public class TestNiFiOrcUtils { row.put("double", d); row.put("bytes", bytes); row.put("string", string); - return row; + return new MapRecord(schema, row); } public static TypeInfo buildPrimitiveOrcSchema() { @@ -359,7 +365,7 @@ public class TestNiFiOrcUtils { TypeInfoCreator.createString())); } - public static Schema buildComplexAvroSchema() { + public static RecordSchema buildComplexRecordSchema() { // Build a fake Avro record with nested types final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("complex.record").namespace("any.data").fields(); builder.name("myInt").type().unionOf().nullType().and().intType().endUnion().nullDefault(); @@ -367,18 +373,18 @@ public class TestNiFiOrcUtils { builder.name("myEnum").type().enumeration("myEnum").symbols("ABC", "DEF", "XYZ").enumDefault("ABC"); builder.name("myLongOrFloat").type().unionOf().longType().and().floatType().endUnion().noDefault(); builder.name("myIntList").type().array().items().intType().noDefault(); - return builder.endRecord(); + return AvroTypeUtil.createSchema(builder.endRecord()); } - public static GenericData.Record buildComplexAvroRecord(Integer i, Map<String, Double> m, String e, Object unionVal, List<Integer> intArray) { - Schema schema = buildComplexAvroSchema(); - GenericData.Record row = new GenericData.Record(schema); + public static Record buildComplexAvroRecord(Integer i, Map<String, Double> m, String e, Object unionVal, List<Integer> intArray) { + RecordSchema schema = buildComplexRecordSchema(); + Map<String, Object> row = new HashMap<>(); row.put("myInt", i); row.put("myMap", m); row.put("myEnum", e); row.put("myLongOrFloat", unionVal); row.put("myIntList", intArray); - return row; + return new MapRecord(schema, row); } public static TypeInfo buildComplexOrcSchema() { http://git-wip-us.apache.org/repos/asf/nifi/blob/ce25ae54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested_record.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested_record.avsc b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested_record.avsc new file mode 100644 index 0000000..d208b15 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested_record.avsc @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "name": "nested_record_test", + "namespace": "org.apache.nifi", + "type": "record", + "fields": [ + { + "name": "myField", + "type": + { + "name": "recordField", + "namespace": "org.apache.nifi", + "type": "record", + "fields": [ + { + "name": "id", + "type": "long" + }, + { + "name": "x", + "type": "string" + } + ] + } + } + ] +} \ No newline at end of file
