[
https://issues.apache.org/jira/browse/NIFI-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405870#comment-15405870
]
ASF GitHub Bot commented on NIFI-1663:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/727#discussion_r73331884
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
---
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
+
+/**
+ * Utility methods for ORC support (conversion from Avro, conversion to
Hive types, e.g.
+ */
+public class NiFiOrcUtils {
+
+ public static Object convertToORCObject(TypeInfo typeInfo, Object o) {
+ if (o != null) {
+ if (typeInfo instanceof UnionTypeInfo) {
+ OrcUnion union = new OrcUnion();
+ // Need to find which of the union types correspond to the
primitive object
+ TypeInfo objectTypeInfo =
TypeInfoUtils.getTypeInfoFromObjectInspector(
+
ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(),
ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
+ List<TypeInfo> unionTypeInfos = ((UnionTypeInfo)
typeInfo).getAllUnionObjectTypeInfos();
+
+ int index = 0;
+ while (index < unionTypeInfos.size() &&
!unionTypeInfos.get(index).equals(objectTypeInfo)) {
+ index++;
+ }
+ if (index < unionTypeInfos.size()) {
+ union.set((byte) index,
convertToORCObject(objectTypeInfo, o));
+ } else {
+ throw new IllegalArgumentException("Object Type for
class " + o.getClass().getName() + " not in Union declaration");
+ }
+ return union;
+ }
+ if (o instanceof Integer) {
+ return new IntWritable((int) o);
+ }
+ if (o instanceof Boolean) {
+ return new BooleanWritable((boolean) o);
+ }
+ if (o instanceof Long) {
+ return new LongWritable((long) o);
+ }
+ if (o instanceof Float) {
+ return new FloatWritable((float) o);
+ }
+ if (o instanceof Double) {
+ return new DoubleWritable((double) o);
+ }
+ if (o instanceof String || o instanceof Utf8 || o instanceof
GenericData.EnumSymbol) {
+ return new Text(o.toString());
+ }
+ if (o instanceof ByteBuffer) {
+ return new BytesWritable(((ByteBuffer) o).array());
+ }
+ if (o instanceof int[]) {
+ int[] intArray = (int[]) o;
+ return Arrays.stream(intArray)
+ .mapToObj((element) ->
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof long[]) {
+ long[] longArray = (long[]) o;
+ return Arrays.stream(longArray)
+ .mapToObj((element) ->
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof float[]) {
+ float[] floatArray = (float[]) o;
+ return IntStream.range(0, floatArray.length)
+ .mapToDouble(i -> floatArray[i])
+ .mapToObj((element) ->
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float)
element))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof double[]) {
+ double[] doubleArray = (double[]) o;
+ return Arrays.stream(doubleArray)
+ .mapToObj((element) ->
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof boolean[]) {
+ boolean[] booleanArray = (boolean[]) o;
+ return IntStream.range(0, booleanArray.length)
+ .map(i -> booleanArray[i] ? 1 : 0)
+ .mapToObj((element) ->
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element ==
1))
+ .collect(Collectors.toList());
+ }
+ if (o instanceof GenericData.Array) {
+ GenericData.Array array = ((GenericData.Array) o);
+ // The type information in this case is interpreted as a
List
+ TypeInfo listTypeInfo = ((ListTypeInfo)
typeInfo).getListElementTypeInfo();
+ return array.stream().map((element) ->
convertToORCObject(listTypeInfo, element)).collect(Collectors.toList());
+ }
+ if (o instanceof List) {
+ return o;
+ }
+ if (o instanceof Map) {
+ MapWritable mapWritable = new MapWritable();
+ TypeInfo keyInfo = ((MapTypeInfo)
typeInfo).getMapKeyTypeInfo();
+ TypeInfo valueInfo = ((MapTypeInfo)
typeInfo).getMapKeyTypeInfo();
+ // Unions are not allowed as key/value types, so if we
convert the key and value objects,
+ // they should return Writable objects
+ ((Map) o).forEach((key, value) -> {
+ Object keyObject = convertToORCObject(keyInfo, key);
+ Object valueObject = convertToORCObject(valueInfo,
value);
+ if (keyObject == null
+ || !(keyObject instanceof Writable)
+ || !(valueObject instanceof Writable)
+ ) {
+ throw new IllegalArgumentException("Maps may only
contain Writable types, and the key cannot be null");
+ }
+ mapWritable.put((Writable) keyObject, (Writable)
valueObject);
+ });
+ return mapWritable;
+ }
+
+ }
+ return null;
--- End diff --
I'll take a look, probably should be an IllegalArgumentException or
something.
> Add support for ORC format
> --------------------------
>
> Key: NIFI-1663
> URL: https://issues.apache.org/jira/browse/NIFI-1663
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.0.0
>
>
> From the Hive/ORC wiki
> (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC):
> The Optimized Row Columnar (ORC) file format provides a highly efficient way
> to store Hive data ... Using ORC files improves performance when Hive is
> reading, writing, and processing data.
> As users are interested in NiFi integrations with Hive (NIFI-981, NIFI-1193,
> etc.), NiFi should be able to support ORC file format to enable users to
> efficiently store flow files for use by Hive.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)