[ 
https://issues.apache.org/jira/browse/NIFI-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405870#comment-15405870
 ] 

ASF GitHub Bot commented on NIFI-1663:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73331884
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
 ---
    @@ -0,0 +1,466 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    +import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
    +import org.apache.hadoop.io.BooleanWritable;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.io.DoubleWritable;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
    +import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
    +import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
    +import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
    +
    +/**
    + * Utility methods for ORC support (conversion from Avro, conversion to 
Hive types, e.g.
    + */
    +public class NiFiOrcUtils {
    +
    +    public static Object convertToORCObject(TypeInfo typeInfo, Object o) {
    +        if (o != null) {
    +            if (typeInfo instanceof UnionTypeInfo) {
    +                OrcUnion union = new OrcUnion();
    +                // Need to find which of the union types correspond to the 
primitive object
    +                TypeInfo objectTypeInfo = 
TypeInfoUtils.getTypeInfoFromObjectInspector(
    +                        
ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(), 
ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
    +                List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) 
typeInfo).getAllUnionObjectTypeInfos();
    +
    +                int index = 0;
    +                while (index < unionTypeInfos.size() && 
!unionTypeInfos.get(index).equals(objectTypeInfo)) {
    +                    index++;
    +                }
    +                if (index < unionTypeInfos.size()) {
    +                    union.set((byte) index, 
convertToORCObject(objectTypeInfo, o));
    +                } else {
    +                    throw new IllegalArgumentException("Object Type for 
class " + o.getClass().getName() + " not in Union declaration");
    +                }
    +                return union;
    +            }
    +            if (o instanceof Integer) {
    +                return new IntWritable((int) o);
    +            }
    +            if (o instanceof Boolean) {
    +                return new BooleanWritable((boolean) o);
    +            }
    +            if (o instanceof Long) {
    +                return new LongWritable((long) o);
    +            }
    +            if (o instanceof Float) {
    +                return new FloatWritable((float) o);
    +            }
    +            if (o instanceof Double) {
    +                return new DoubleWritable((double) o);
    +            }
    +            if (o instanceof String || o instanceof Utf8 || o instanceof 
GenericData.EnumSymbol) {
    +                return new Text(o.toString());
    +            }
    +            if (o instanceof ByteBuffer) {
    +                return new BytesWritable(((ByteBuffer) o).array());
    +            }
    +            if (o instanceof int[]) {
    +                int[] intArray = (int[]) o;
    +                return Arrays.stream(intArray)
    +                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof long[]) {
    +                long[] longArray = (long[]) o;
    +                return Arrays.stream(longArray)
    +                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof float[]) {
    +                float[] floatArray = (float[]) o;
    +                return IntStream.range(0, floatArray.length)
    +                        .mapToDouble(i -> floatArray[i])
    +                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float) 
element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof double[]) {
    +                double[] doubleArray = (double[]) o;
    +                return Arrays.stream(doubleArray)
    +                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof boolean[]) {
    +                boolean[] booleanArray = (boolean[]) o;
    +                return IntStream.range(0, booleanArray.length)
    +                        .map(i -> booleanArray[i] ? 1 : 0)
    +                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 
1))
    +                        .collect(Collectors.toList());
    +            }
    +            if (o instanceof GenericData.Array) {
    +                GenericData.Array array = ((GenericData.Array) o);
    +                // The type information in this case is interpreted as a 
List
    +                TypeInfo listTypeInfo = ((ListTypeInfo) 
typeInfo).getListElementTypeInfo();
    +                return array.stream().map((element) -> 
convertToORCObject(listTypeInfo, element)).collect(Collectors.toList());
    +            }
    +            if (o instanceof List) {
    +                return o;
    +            }
    +            if (o instanceof Map) {
    +                MapWritable mapWritable = new MapWritable();
    +                TypeInfo keyInfo = ((MapTypeInfo) 
typeInfo).getMapKeyTypeInfo();
    +                TypeInfo valueInfo = ((MapTypeInfo) 
typeInfo).getMapKeyTypeInfo();
    +                // Unions are not allowed as key/value types, so if we 
convert the key and value objects,
    +                // they should return Writable objects
    +                ((Map) o).forEach((key, value) -> {
    +                    Object keyObject = convertToORCObject(keyInfo, key);
    +                    Object valueObject = convertToORCObject(valueInfo, 
value);
    +                    if (keyObject == null
    +                            || !(keyObject instanceof Writable)
    +                            || !(valueObject instanceof Writable)
    +                            ) {
    +                        throw new IllegalArgumentException("Maps may only 
contain Writable types, and the key cannot be null");
    +                    }
    +                    mapWritable.put((Writable) keyObject, (Writable) 
valueObject);
    +                });
    +                return mapWritable;
    +            }
    +
    +        }
    +        return null;
    --- End diff --
    
    I'll take a look, probably should be an IllegalArgumentException or 
something.


> Add support for ORC format
> --------------------------
>
>                 Key: NIFI-1663
>                 URL: https://issues.apache.org/jira/browse/NIFI-1663
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> From the Hive/ORC wiki 
> (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC): 
> The Optimized Row Columnar (ORC) file format provides a highly efficient way 
> to store Hive data ... Using ORC files improves performance when Hive is 
> reading, writing, and processing data.
> As users are interested in NiFi integrations with Hive (NIFI-981, NIFI-1193, 
> etc.), NiFi should be able to support ORC file format to enable users to 
> efficiently store flow files for use by Hive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to