[ 
https://issues.apache.org/jira/browse/NIFI-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15344505#comment-15344505
 ] 

ASF GitHub Bot commented on NIFI-1663:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/477#discussion_r68074816
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/orc/OrcUtils.java
 ---
    @@ -0,0 +1,443 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.util.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.commons.lang3.mutable.MutableInt;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
    +import org.apache.orc.TypeDescription;
    +
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to 
Hive types, e.g.
    + */
    +public class OrcUtils {
    +
    +    public static void putToRowBatch(ColumnVector col, MutableInt 
vectorOffset, int rowNumber, Schema fieldSchema, Object o) {
    +        Schema.Type fieldType = fieldSchema.getType();
    +
    +        if (fieldType == null) {
    +            throw new IllegalArgumentException("Field type is null");
    +        }
    +
    +        if (Schema.Type.INT.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((LongColumnVector) col).vector[rowNumber] = (int) o;
    +            }
    +        } else if (Schema.Type.LONG.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((LongColumnVector) col).vector[rowNumber] = (long) o;
    +            }
    +        } else if (Schema.Type.BOOLEAN.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((LongColumnVector) col).vector[rowNumber] = ((boolean) o) 
? 1 : 0;
    +            }
    +        } else if (Schema.Type.BYTES.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ByteBuffer byteBuffer = ((ByteBuffer) o);
    +                int size = byteBuffer.remaining();
    +                byte[] buf = new byte[size];
    +                byteBuffer.get(buf, 0, size);
    +                ((BytesColumnVector) col).setVal(rowNumber, buf);
    +            }
    +        } else if (Schema.Type.DOUBLE.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((DoubleColumnVector) col).vector[rowNumber] = (double) o;
    +            }
    +        } else if (Schema.Type.FLOAT.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((DoubleColumnVector) col).vector[rowNumber] = (float) o;
    +            }
    +        } else if (Schema.Type.STRING.equals(fieldType) || 
Schema.Type.ENUM.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((BytesColumnVector) col).setVal(rowNumber, 
o.toString().getBytes());
    +            }
    +        } else if (Schema.Type.UNION.equals(fieldType)) {
    +            // If the union only has one non-null type in it, it was 
flattened in the ORC schema
    +            if (col instanceof UnionColumnVector) {
    +                UnionColumnVector union = ((UnionColumnVector) col);
    +                Schema.Type avroType = 
OrcUtils.getAvroSchemaTypeOfObject(o);
    +                // Find the index in the union with the matching Avro type
    +                int unionIndex = -1;
    +                List<Schema> types = fieldSchema.getTypes();
    +                final int numFields = types.size();
    +                for (int i = 0; i < numFields && unionIndex == -1; i++) {
    +                    if (avroType.equals(types.get(i).getType())) {
    +                        unionIndex = i;
    +                    }
    +                }
    +                if (unionIndex == -1) {
    +                    throw new IllegalArgumentException("Object type " + 
avroType.getName() + " not found in union '" + fieldSchema.getName() + "'");
    +                }
    +
    +                // Need nested vector offsets
    +                MutableInt unionVectorOffset = new MutableInt(0);
    +                putToRowBatch(union.fields[unionIndex], unionVectorOffset, 
rowNumber, fieldSchema.getTypes().get(unionIndex), o);
    +            } else {
    +                // Find and use the non-null type from the union
    +                List<Schema> types = fieldSchema.getTypes();
    +                Schema effectiveType = null;
    +                for (Schema type : types) {
    +                    if (!Schema.Type.NULL.equals(type.getType())) {
    +                        effectiveType = type;
    +                        break;
    +                    }
    +                }
    +                putToRowBatch(col, vectorOffset, rowNumber, effectiveType, 
o);
    +            }
    +
    +        } else if (Schema.Type.ARRAY.equals(fieldType)) {
    +            Schema arrayType = fieldSchema.getElementType();
    +            ListColumnVector array = ((ListColumnVector) col);
    +            if (o instanceof int[]) {
    +                int[] intArray = (int[]) o;
    +                for (int i = 0; i < intArray.length; i++) {
    +                    ((LongColumnVector) 
array.child).vector[vectorOffset.getValue() + i] = intArray[i];
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = intArray.length;
    +                vectorOffset.add(intArray.length);
    +            } else if (o instanceof long[]) {
    +                long[] longArray = (long[]) o;
    +                for (int i = 0; i < longArray.length; i++) {
    +                    ((LongColumnVector) 
array.child).vector[vectorOffset.getValue() + i] = longArray[i];
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = longArray.length;
    +                vectorOffset.add(longArray.length);
    +            } else if (o instanceof float[]) {
    +                float[] floatArray = (float[]) o;
    +                for (int i = 0; i < floatArray.length; i++) {
    +                    ((DoubleColumnVector) 
array.child).vector[vectorOffset.getValue() + i] = floatArray[i];
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = floatArray.length;
    +                vectorOffset.add(floatArray.length);
    +            } else if (o instanceof double[]) {
    +                double[] doubleArray = (double[]) o;
    +                for (int i = 0; i < doubleArray.length; i++) {
    +                    ((DoubleColumnVector) 
array.child).vector[vectorOffset.getValue() + i] = doubleArray[i];
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = doubleArray.length;
    +                vectorOffset.add(doubleArray.length);
    +            } else if (o instanceof String[]) {
    +                String[] stringArray = (String[]) o;
    +                BytesColumnVector byteCol = ((BytesColumnVector) 
array.child);
    +                for (int i = 0; i < stringArray.length; i++) {
    +                    if (stringArray[i] == null) {
    +                        byteCol.isNull[rowNumber] = true;
    +                    } else {
    +                        byteCol.setVal(vectorOffset.getValue() + i, 
stringArray[i].getBytes());
    +                    }
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = stringArray.length;
    +                vectorOffset.add(stringArray.length);
    +            } else if (o instanceof Map[]) {
    +                Map[] mapArray = (Map[]) o;
    +                MutableInt mapVectorOffset = new MutableInt(0);
    +                for (int i = 0; i < mapArray.length; i++) {
    +                    if (mapArray[i] == null) {
    +                        array.child.isNull[rowNumber] = true;
    +                    } else {
    +                        putToRowBatch(array.child, mapVectorOffset, 
vectorOffset.getValue() + i, arrayType, mapArray[i]);
    +                    }
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = mapArray.length;
    +                vectorOffset.add(mapArray.length);
    +            } else if (o instanceof List) {
    +                List listArray = (List) o;
    +                MutableInt listVectorOffset = new MutableInt(0);
    +                int numElements = listArray.size();
    +                for (int i = 0; i < numElements; i++) {
    +                    if (listArray.get(i) == null) {
    +                        array.child.isNull[rowNumber] = true;
    +                    } else {
    +                        putToRowBatch(array.child, listVectorOffset, 
vectorOffset.getValue() + i, arrayType, listArray.get(i));
    +                    }
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = numElements;
    +                vectorOffset.add(numElements);
    +
    +            } else {
    +                throw new IllegalArgumentException("Object class " + 
o.getClass().getName() + " not supported as an ORC list/array");
    +            }
    +
    +        } else if (Schema.Type.MAP.equals(fieldType)) {
    +            MapColumnVector map = ((MapColumnVector) col);
    +
    +            // Avro maps require String keys
    +            @SuppressWarnings("unchecked")
    +            Map<String, ?> mapObj = (Map<String, ?>) o;
    +            int effectiveRowNumber = vectorOffset.getValue();
    +            for (Map.Entry<String, ?> entry : mapObj.entrySet()) {
    +                putToRowBatch(map.keys, vectorOffset, effectiveRowNumber, 
Schema.create(Schema.Type.STRING), entry.getKey());
    +                putToRowBatch(map.values, vectorOffset, 
effectiveRowNumber, fieldSchema.getValueType(), entry.getValue());
    +                effectiveRowNumber++;
    +            }
    +            map.offsets[rowNumber] = vectorOffset.longValue();
    +            map.lengths[rowNumber] = mapObj.size();
    +            vectorOffset.add(mapObj.size());
    +
    +        } else {
    +            throw new IllegalArgumentException("Field type " + 
fieldType.getName() + " not recognized");
    +        }
    +
    +    }
    +
    +    public static String normalizeHiveTableName(String name) {
    +        return name.replaceAll("[\\. ]", "_");
    +    }
    +
    +    public static String generateHiveDDL(Schema avroSchema, String 
tableName) {
    +        Schema.Type schemaType = avroSchema.getType();
    +        StringBuffer sb = new StringBuffer("CREATE EXTERNAL TABLE IF NOT 
EXISTS ");
    +        sb.append(tableName);
    +        sb.append(" (");
    +        if (Schema.Type.RECORD.equals(schemaType)) {
    +            List<String> hiveColumns = new ArrayList<>();
    +            List<Schema.Field> fields = avroSchema.getFields();
    +            if (fields != null) {
    +                for (Schema.Field field : fields) {
    +                    hiveColumns.add(field.name() + " " + 
getHiveTypeFromAvroType(field.schema()));
    +                }
    +            }
    +            sb.append(StringUtils.join(hiveColumns, ", "));
    +            sb.append(") STORED AS ORC");
    +            return sb.toString();
    +        } else {
    +            throw new IllegalArgumentException("Avro schema is of type " + 
schemaType.getName() + ", not RECORD");
    +        }
    +    }
    +
    +
    +    public static void addOrcField(TypeDescription orcSchema, Schema.Field 
avroField) {
    +        Schema fieldSchema = avroField.schema();
    +        String fieldName = avroField.name();
    +
    +        orcSchema.addField(fieldName, getOrcField(fieldSchema));
    +    }
    +
    +    public static TypeDescription getOrcField(Schema fieldSchema) throws 
IllegalArgumentException {
    +        Schema.Type fieldType = fieldSchema.getType();
    +
    +        if (Schema.Type.INT.equals(fieldType)
    --- End diff --
    
    Would recommend we switch this to a switch/case statement


> Add support for ORC format
> --------------------------
>
>                 Key: NIFI-1663
>                 URL: https://issues.apache.org/jira/browse/NIFI-1663
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> From the Hive/ORC wiki 
> (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC): 
> The Optimized Row Columnar (ORC) file format provides a highly efficient way 
> to store Hive data ... Using ORC files improves performance when Hive is 
> reading, writing, and processing data.
> As users are interested in NiFi integrations with Hive (NIFI-981, NIFI-1193, 
> etc.), NiFi should be able to support ORC file format to enable users to 
> efficiently store flow files for use by Hive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to