[ 
https://issues.apache.org/jira/browse/NIFI-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15344499#comment-15344499
 ] 

ASF GitHub Bot commented on NIFI-1663:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/477#discussion_r68074551
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/orc/OrcUtils.java
 ---
    @@ -0,0 +1,443 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.util.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.commons.lang3.mutable.MutableInt;
    +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
    +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
    +import org.apache.orc.TypeDescription;
    +
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to 
Hive types, e.g.
    + */
    +public class OrcUtils {
    +
    +    public static void putToRowBatch(ColumnVector col, MutableInt 
vectorOffset, int rowNumber, Schema fieldSchema, Object o) {
    +        Schema.Type fieldType = fieldSchema.getType();
    +
    +        if (fieldType == null) {
    +            throw new IllegalArgumentException("Field type is null");
    +        }
    +
    +        if (Schema.Type.INT.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((LongColumnVector) col).vector[rowNumber] = (int) o;
    +            }
    +        } else if (Schema.Type.LONG.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((LongColumnVector) col).vector[rowNumber] = (long) o;
    +            }
    +        } else if (Schema.Type.BOOLEAN.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((LongColumnVector) col).vector[rowNumber] = ((boolean) o) 
? 1 : 0;
    +            }
    +        } else if (Schema.Type.BYTES.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ByteBuffer byteBuffer = ((ByteBuffer) o);
    +                int size = byteBuffer.remaining();
    +                byte[] buf = new byte[size];
    +                byteBuffer.get(buf, 0, size);
    +                ((BytesColumnVector) col).setVal(rowNumber, buf);
    +            }
    +        } else if (Schema.Type.DOUBLE.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((DoubleColumnVector) col).vector[rowNumber] = (double) o;
    +            }
    +        } else if (Schema.Type.FLOAT.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((DoubleColumnVector) col).vector[rowNumber] = (float) o;
    +            }
    +        } else if (Schema.Type.STRING.equals(fieldType) || 
Schema.Type.ENUM.equals(fieldType)) {
    +            if (o == null) {
    +                col.isNull[rowNumber] = true;
    +            } else {
    +                ((BytesColumnVector) col).setVal(rowNumber, 
o.toString().getBytes());
    +            }
    +        } else if (Schema.Type.UNION.equals(fieldType)) {
    +            // If the union only has one non-null type in it, it was 
flattened in the ORC schema
    +            if (col instanceof UnionColumnVector) {
    +                UnionColumnVector union = ((UnionColumnVector) col);
    +                Schema.Type avroType = 
OrcUtils.getAvroSchemaTypeOfObject(o);
    +                // Find the index in the union with the matching Avro type
    +                int unionIndex = -1;
    +                List<Schema> types = fieldSchema.getTypes();
    +                final int numFields = types.size();
    +                for (int i = 0; i < numFields && unionIndex == -1; i++) {
    +                    if (avroType.equals(types.get(i).getType())) {
    +                        unionIndex = i;
    +                    }
    +                }
    +                if (unionIndex == -1) {
    +                    throw new IllegalArgumentException("Object type " + 
avroType.getName() + " not found in union '" + fieldSchema.getName() + "'");
    +                }
    +
    +                // Need nested vector offsets
    +                MutableInt unionVectorOffset = new MutableInt(0);
    +                putToRowBatch(union.fields[unionIndex], unionVectorOffset, 
rowNumber, fieldSchema.getTypes().get(unionIndex), o);
    +            } else {
    +                // Find and use the non-null type from the union
    +                List<Schema> types = fieldSchema.getTypes();
    +                Schema effectiveType = null;
    +                for (Schema type : types) {
    +                    if (!Schema.Type.NULL.equals(type.getType())) {
    +                        effectiveType = type;
    +                        break;
    +                    }
    +                }
    +                putToRowBatch(col, vectorOffset, rowNumber, effectiveType, 
o);
    +            }
    +
    +        } else if (Schema.Type.ARRAY.equals(fieldType)) {
    +            Schema arrayType = fieldSchema.getElementType();
    +            ListColumnVector array = ((ListColumnVector) col);
    +            if (o instanceof int[]) {
    +                int[] intArray = (int[]) o;
    +                for (int i = 0; i < intArray.length; i++) {
    +                    ((LongColumnVector) 
array.child).vector[vectorOffset.getValue() + i] = intArray[i];
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = intArray.length;
    +                vectorOffset.add(intArray.length);
    +            } else if (o instanceof long[]) {
    +                long[] longArray = (long[]) o;
    +                for (int i = 0; i < longArray.length; i++) {
    +                    ((LongColumnVector) 
array.child).vector[vectorOffset.getValue() + i] = longArray[i];
    +                }
    +                array.offsets[rowNumber] = vectorOffset.longValue();
    +                array.lengths[rowNumber] = longArray.length;
    +                vectorOffset.add(longArray.length);
    +            } else if (o instanceof float[]) {
    --- End diff --
    
    Same comment here as above with int[] / long[]


> Add support for ORC format
> --------------------------
>
>                 Key: NIFI-1663
>                 URL: https://issues.apache.org/jira/browse/NIFI-1663
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> From the Hive/ORC wiki 
> (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC): 
> The Optimized Row Columnar (ORC) file format provides a highly efficient way 
> to store Hive data ... Using ORC files improves performance when Hive is 
> reading, writing, and processing data.
> As users are interested in NiFi integrations with Hive (NIFI-981, NIFI-1193, 
> etc.), NiFi should be able to support ORC file format to enable users to 
> efficiently store flow files for use by Hive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to