NIFI-4963: Added Hive3 bundle
- Incorporated review comments
- Added more defensive code for PutHive3Streaming error handling

This closes #2755.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da99f873
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da99f873
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da99f873

Branch: refs/heads/master
Commit: da99f873a7d2f636465efd86178e578da75674a0
Parents: 8feac9a
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Mon Jun 4 11:37:48 2018 -0400
Committer: Bryan Bende <bbe...@apache.org>
Committed: Wed Jun 13 14:32:58 2018 -0400

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 nifi-assembly/pom.xml                           |  23 +
 .../nifi-hive-bundle/nifi-hive-nar/pom.xml      |   2 +
 .../nifi-hive-processors/pom.xml                |   7 +-
 .../nifi-hive-services-api-nar/pom.xml          |   2 +
 .../nifi-hive-services-api/pom.xml              |   6 +
 .../apache/nifi/dbcp/hive/Hive3DBCPService.java |  30 +
 .../nifi-hive-bundle/nifi-hive3-nar/pom.xml     |  49 ++
 .../src/main/resources/META-INF/NOTICE          | 349 ++++++++
 .../nifi-hive3-processors/pom.xml               | 140 +++
 .../hadoop/hive/ql/io/orc/NiFiOrcUtils.java     | 533 +++++++++++
 .../apache/hive/streaming/HiveRecordWriter.java | 106 +++
 .../apache/hive/streaming/NiFiRecordSerDe.java  | 282 ++++++
 .../nifi/dbcp/hive/Hive3ConnectionPool.java     | 385 ++++++++
 .../hive/AbstractHive3QLProcessor.java          | 348 ++++++++
 .../apache/nifi/processors/hive/PutHive3QL.java | 280 ++++++
 .../nifi/processors/hive/PutHive3Streaming.java | 560 ++++++++++++
 .../nifi/processors/hive/SelectHive3QL.java     | 477 ++++++++++
 .../org/apache/nifi/processors/orc/PutORC.java  | 175 ++++
 .../orc/record/ORCHDFSRecordWriter.java         | 110 +++
 .../hive/AuthenticationFailedException.java     |  23 +
 .../apache/nifi/util/hive/CsvOutputOptions.java |  63 ++
 .../apache/nifi/util/hive/HiveConfigurator.java | 119 +++
 .../apache/nifi/util/hive/HiveJdbcCommon.java   | 450 ++++++++++
 .../org/apache/nifi/util/hive/HiveOptions.java  | 117 +++
 .../org/apache/nifi/util/hive/HiveUtils.java    |  76 ++
 .../nifi/util/hive/ValidationResources.java     |  41 +
 ...org.apache.nifi.controller.ControllerService |  15 +
 .../org.apache.nifi.processor.Processor         |  18 +
 .../hive/streaming/StubConnectionError.java     |  31 +
 .../hive/streaming/StubSerializationError.java  |  23 +
 .../hive/streaming/StubStreamingIOFailure.java  |  28 +
 .../hive/streaming/StubTransactionError.java    |  27 +
 .../nifi/dbcp/hive/Hive3ConnectionPoolTest.java | 138 +++
 .../nifi/processors/hive/TestHive3Parser.java   | 292 ++++++
 .../nifi/processors/hive/TestPutHive3QL.java    | 792 +++++++++++++++++
 .../processors/hive/TestPutHive3Streaming.java  | 878 +++++++++++++++++++
 .../nifi/processors/hive/TestSelectHive3QL.java | 539 ++++++++++++
 .../apache/nifi/processors/orc/PutORCTest.java  | 416 +++++++++
 .../apache/nifi/util/orc/TestNiFiOrcUtils.java  | 437 +++++++++
 .../src/test/resources/array_of_records.avsc    |  38 +
 .../src/test/resources/core-site-security.xml   |  30 +
 .../src/test/resources/core-site.xml            |  22 +
 .../src/test/resources/fake.keytab              |   0
 .../src/test/resources/hive-site-security.xml   |  26 +
 .../src/test/resources/hive-site.xml            |  22 +
 .../src/test/resources/krb5.conf                |   0
 .../src/test/resources/user.avsc                |  26 +
 .../src/test/resources/user_logical_types.avsc  |  27 +
 nifi-nar-bundles/nifi-hive-bundle/pom.xml       |  59 +-
 50 files changed, 8587 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index d6c9b39..05351b5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -55,4 +55,4 @@ script:
   # Note: The reason the sed is done as part of script is to ensure the pom 
hack 
   # won't affect the 'clean install' above
   - bash .travis.sh
-  - mvn -T 2 clean install -Pcontrib-check,include-grpc,include-atlas 
-Ddir-only | grep -v -F -f .travis-output-filters && exit ${PIPESTATUS[0]}
+  - mvn -T 2 clean install 
-Pcontrib-check,include-grpc,include-atlas,include-hive3 -Ddir-only | grep -v 
-F -f .travis-output-filters && exit ${PIPESTATUS[0]}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index e610aa0..3f473c8 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -549,6 +549,12 @@ language governing permissions and limitations under the 
License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hive3-nar</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-site-to-site-reporting-nar</artifactId>
             <version>1.7.0-SNAPSHOT</version>
             <type>nar</type>
@@ -746,6 +752,23 @@ language governing permissions and limitations under the 
License. -->
             </dependencies>
         </profile>
         <profile>
+            <id>include-hive3</id>
+            <!-- This profile handles the inclusion of Hive 3 artifacts. The 
NAR
+            is quite large and makes the resultant binary distribution 
significantly
+            larger (275+ MB). -->
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-hive3-nar</artifactId>
+                    <version>1.7.0-SNAPSHOT</version>
+                    <type>nar</type>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
             <id>rpm</id>
             <activation>
                 <activeByDefault>false</activeByDefault>

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
index 41e0159..cb2d60d 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
@@ -28,6 +28,8 @@
     <properties>
         <maven.javadoc.skip>true</maven.javadoc.skip>
         <source.skip>true</source.skip>
+        <!-- Need to override hadoop.version here, for Hive and hadoop-client 
transitive dependencies -->
+        <hadoop.version>${hive.hadoop.version}</hadoop.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index f7b7b0b..4a6be6d 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -24,13 +24,17 @@
 
     <artifactId>nifi-hive-processors</artifactId>
     <packaging>jar</packaging>
+
     <properties>
-        <hive.version>1.2.1</hive.version>
+        <!-- Need to override hadoop.version here, for Hive and hadoop-client 
transitive dependencies -->
+        <hadoop.version>${hive.hadoop.version}</hadoop.version>
     </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -84,6 +88,7 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml
index 1060225..b0b9a4c 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api-nar/pom.xml
@@ -28,6 +28,8 @@
     <properties>
         <maven.javadoc.skip>true</maven.javadoc.skip>
         <source.skip>true</source.skip>
+        <!-- Need to override hadoop.version here, for Hive and hadoop-client 
transitive dependencies -->
+        <hadoop.version>${hive.hadoop.version}</hadoop.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml
index 6d85c38..2db9b34 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/pom.xml
@@ -25,10 +25,16 @@
     <artifactId>nifi-hive-services-api</artifactId>
     <packaging>jar</packaging>
 
+    <properties>
+        <!-- Need to override hadoop.version here, for Hive and hadoop-client 
transitive dependencies -->
+        <hadoop.version>${hive.hadoop.version}</hadoop.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/src/main/java/org/apache/nifi/dbcp/hive/Hive3DBCPService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/src/main/java/org/apache/nifi/dbcp/hive/Hive3DBCPService.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/src/main/java/org/apache/nifi/dbcp/hive/Hive3DBCPService.java
new file mode 100644
index 0000000..e3af3aa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-services-api/src/main/java/org/apache/nifi/dbcp/hive/Hive3DBCPService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp.hive;
+
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+
+/**
+ * Definition for Database Connection Pooling Service.
+ *
+ */
+@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
+@CapabilityDescription("Provides Database Connection Pooling Service for 
Apache Hive. Connections can be asked from pool and returned after usage.")
+public interface Hive3DBCPService extends HiveDBCPService {
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/pom.xml
new file mode 100644
index 0000000..41286d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hive-bundle</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hive3-nar</artifactId>
+    <version>1.7.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+        <!-- Need to override hadoop.version here, for Hive and hadoop-client 
transitive dependencies -->
+        <hadoop.version>${hive3.hadoop.version}</hadoop.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hive-services-api-nar</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hive3-processors</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..9da3e38
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,349 @@
+nifi-hive-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This includes derived works from the Apache Storm (ASLv2 licensed) project 
(https://github.com/apache/storm):
+  Copyright 2015 The Apache Software Foundation
+  The derived work is adapted from
+    org/apache/storm/hive/common/HiveWriter.java
+    org/apache/storm/hive/common/HiveOptions.java
+  and can be found in the org.apache.nifi.util.hive package
+
+This includes derived works from the Apache Hive (ASLv2 licensed) project 
(https://github.com/apache/hive):
+  Copyright 2008-2016 The Apache Software Foundation
+  The derived work is adapted from
+    release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+  and can be found in the org.apache.hadoop.hive.ql.io.orc package
+  The derived work is adapted from
+      
branch-3.0/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+    and can be found in the org.apache.hive.streaming.HiveRecordWriter class
+  The derived work is adapted from
+      branch-3.0/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java
+    and can be found in the org.apache.hive.streaming.NiFiRecordSerDe class
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Ant
+    The following NOTICE information applies:
+      Apache Ant
+      Copyright 1999-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Codec
+      The following NOTICE information applies:
+        Apache Commons Codec
+        Copyright 2002-2014 The Apache Software Foundation
+
+        src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+        contains test data from http://aspell.net/test/orig/batch0.tab.
+        Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org)
+
+        
===============================================================================
+
+        The content of package org.apache.commons.codec.language.bm has been 
translated
+        from the original php source code available at 
http://stevemorse.org/phoneticinfo.htm
+        with permission from the original authors.
+        Original source copyright:
+        Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons DBCP
+    The following NOTICE information applies:
+      Apache Commons DBCP
+      Copyright 2001-2015 The Apache Software Foundation.
+
+  (ASLv2) Apache Commons EL
+    The following NOTICE information applies:
+      Apache Commons EL
+      Copyright 1999-2016 The Apache Software Foundation
+
+      EL-8 patch - Copyright 2004-2007 Jamie Taylor
+      http://issues.apache.org/jira/browse/EL-8
+
+  (ASLv2) Apache HttpComponents
+      The following NOTICE information applies:
+        Apache HttpComponents Client
+        Copyright 1999-2016 The Apache Software Foundation
+        Apache HttpComponents Core - HttpCore
+        Copyright 2006-2009 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Logging
+    The following NOTICE information applies:
+      Apache Commons Logging
+      Copyright 2003-2014 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Pool
+    The following NOTICE information applies:
+      Apache Commons Pool
+      Copyright 1999-2009 The Apache Software Foundation.
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Hive
+    The following NOTICE information applies:
+      Apache Hive
+      Copyright 2008-2015 The Apache Software Foundation
+
+      This product includes software developed by The Apache Software
+      Foundation (http://www.apache.org/).
+
+      This product includes Jersey (https://jersey.java.net/)
+      Copyright (c) 2010-2014 Oracle and/or its affiliates.
+
+      This project includes software copyrighted by Microsoft Corporation and
+      licensed under the Apache License, Version 2.0.
+
+      This project includes software copyrighted by Dell SecureWorks and
+      licensed under the Apache License, Version 2.0.
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+       ## Credits
+
+       A list of contributors may be found from CREDITS file, which is included
+       in some artifacts (usually source distributions); but is always 
available
+       from the source code management (SCM) system project uses.
+
+  (ASLv2) BoneCP
+    The following NOTICE information applies:
+       BoneCP
+       Copyright 2010 Wallace Wadge
+
+  (ASLv2) Apache Hadoop
+    The following NOTICE information applies:
+      The binary distribution of this product bundles binaries of
+      org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which 
has the
+      following notices:
+      * Copyright 2011 Dain Sundstrom <d...@iq80.com>
+      * Copyright 2011 FuseSource Corp. http://fusesource.com
+
+      The binary distribution of this product bundles binaries of
+      org.fusesource.hawtjni:hawtjni-runtime 
(https://github.com/fusesource/hawtjni),
+      which has the following notices:
+      * This product includes software developed by FuseSource Corp.
+        http://fusesource.com
+      * This product includes software developed at
+        Progress Software Corporation and/or its  subsidiaries or affiliates.
+      * This product includes software developed by IBM Corporation and others.
+
+  (ASLv2) Apache HBase
+    The following NOTICE information applies:
+      Apache HBase
+      Copyright 2007-2015 The Apache Software Foundation
+
+      --
+      This product incorporates portions of the 'Hadoop' project
+
+      Copyright 2007-2009 The Apache Software Foundation
+
+      Licensed under the Apache License v2.0
+      --
+      Our Orca logo we got here: http://www.vectorfree.com/jumping-orca
+      It is licensed Creative Commons Attribution 3.0.
+      See https://creativecommons.org/licenses/by/3.0/us/
+      We changed the logo by stripping the colored background, inverting
+      it and then rotating it some.
+
+      Later we found that vectorfree.com image is not properly licensed.
+      The original is owned by vectorportal.com. The original was
+      relicensed so we could use it as Creative Commons Attribution 3.0.
+      The license is bundled with the download available here:
+      
http://www.vectorportal.com/subcategory/205/KILLER-WHALE-FREE-VECTOR.eps/ifile/9136/detailtest.asp
+      --
+      This product includes portions of the Bootstrap project v3.0.0
+
+      Copyright 2013 Twitter, Inc.
+
+      Licensed under the Apache License v2.0
+
+      This product uses the Glyphicons Halflings icon set.
+
+      http://glyphicons.com/
+
+      Copyright Jan Kovařík
+
+      Licensed under the Apache License v2.0 as a part of the Bootstrap 
project.
+
+      --
+      This product includes portions of the Guava project v14, specifically
+      
'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java'
+
+      Copyright (C) 2007 The Guava Authors
+
+      Licensed under the Apache License, Version 2.0
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+  (ASLv2) Apache Curator
+    The following NOTICE information applies:
+      Apache Curator
+      Copyright 2013-2014 The Apache Software Foundation
+
+  (ASLv2) Apache Derby
+    The following NOTICE information applies:
+      Apache Derby
+      Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, 
Apache JDO, Apache DDLUtils,
+      the Derby hat logo, the Apache JDO logo, and the Apache feather logo are 
trademarks of The Apache Software Foundation.
+
+  (ASLv2) Apache DS
+    The following NOTICE information applies:
+      ApacheDS
+      Copyright 2003-2015 The Apache Software Foundation
+
+  (ASLv2) Apache Geronimo
+    The following NOTICE information applies:
+      Apache Geronimo
+      Copyright 2003-2008 The Apache Software Foundation
+
+  (ASLv2) HTrace Core
+    The following NOTICE information applies:
+      In addition, this product includes software dependencies. See
+      the accompanying LICENSE.txt for a listing of dependencies
+      that are NOT Apache licensed (with pointers to their licensing)
+
+      Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
+      is a distributed tracing system that is Apache 2.0 Licensed.
+      Copyright 2012 Twitter, Inc.
+
+  (ASLv2) Jettison
+    The following NOTICE information applies:
+       Copyright 2006 Envoi Solutions LLC
+
+  (ASLv2) Jetty
+    The following NOTICE information applies:
+       Jetty Web Container
+       Copyright 1995-2017 Mort Bay Consulting Pty Ltd.
+
+  (ASLv2) Apache log4j
+    The following NOTICE information applies:
+      Apache log4j
+      Copyright 2007 The Apache Software Foundation
+
+  (ASLv2) Parquet MR
+    The following NOTICE information applies:
+      Parquet MR
+      Copyright 2012 Twitter, Inc.
+
+      This project includes code from https://github.com/lemire/JavaFastPFOR
+      
parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java
+      Apache License Version 2.0 http://www.apache.org/licenses/.
+      (c) Daniel Lemire, http://lemire.me/en/
+
+  (ASLv2) Apache Thrift
+    The following NOTICE information applies:
+      Apache Thrift
+      Copyright 2006-2010 The Apache Software Foundation.
+
+  (ASLv2) Apache Twill
+    The following NOTICE information applies:
+      Apache Twill
+      Copyright 2013-2016 The Apache Software Foundation
+
+  (ASLv2) Dropwizard Metrics
+    The following NOTICE information applies:
+      Metrics
+      Copyright 2010-2013 Coda Hale and Yammer, Inc.
+
+      This product includes code derived from the JSR-166 project 
(ThreadLocalRandom, Striped64,
+      LongAdder), which was released with the following comments:
+
+          Written by Doug Lea with assistance from members of JCP JSR-166
+          Expert Group and released to the public domain, as explained at
+          http://creativecommons.org/publicdomain/zero/1.0/
+
+  (ASLv2) Joda Time
+      The following NOTICE information applies:
+        This product includes software developed by
+        Joda.org (http://www.joda.org/).
+
+  (ASLv2) The Netty Project
+      The following NOTICE information applies:
+        The Netty Project
+        Copyright 2011 The Netty Project
+
+  (ASLv2) Apache Tomcat
+      The following NOTICE information applies:
+        Apache Tomcat
+        Copyright 2007 The Apache Software Foundation
+
+          Java Management Extensions (JMX) support is provided by
+          the MX4J package, which is open source software.  The
+          original software and related information is available
+          at http://mx4j.sourceforge.net.
+
+          Java compilation software for JSP pages is provided by Eclipse,
+          which is open source software.  The orginal software and
+          related infomation is available at
+          http://www.eclipse.org.
+
+  (ASLv2) Apache ZooKeeper
+     The following NOTICE information applies:
+       Apache ZooKeeper
+       Copyright 2009-2012 The Apache Software Foundation
+
+  (ASLv2) Google GSON
+     The following NOTICE information applies:
+       Copyright 2008 Google Inc.
+
+  (ASLv2) JPam
+    The following NOTICE information applies:
+      Copyright 2003-2006 Greg Luck
+
+  ************************
+  Common Development and Distribution License 1.1
+  ************************
+
+  The following binary components are provided under the Common Development 
and Distribution License 1.1. See project link for details.
+
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-client 
(com.sun.jersey:jersey-client:jar:1.9 - https://jersey.java.net)
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.9 
- https://jersey.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.9 
- https://jersey.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-server 
(com.sun.jersey:jersey-server:jar:1.9 - https://jersey.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-guice 
(com.sun.jersey.contribs:jersey-guice:jar:1.9 - https://jersey.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding 
(javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) 
(javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail)
+
+
+  ************************
+  Common Development and Distribution License 1.0
+  ************************
+
+    The following binary components are provided under the Common Development 
and Distribution License 1.0.  See project link for details.
+
+      (CDDL 1.0) JavaServlet(TM) Specification 
(javax.servlet:servlet-api:jar:2.5 - no url available)
+      (CDDL 1.0) (GPL3) Streaming API For XML 
(javax.xml.stream:stax-api:jar:1.0-2 - no url provided)
+      (CDDL 1.0) JavaBeans Activation Framework (JAF) 
(javax.activation:activation:jar:1.1 - 
http://java.sun.com/products/javabeans/jaf/index.jsp)
+      (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - 
http://jsp.java.net)
+
+  *****************
+  Public Domain
+  *****************
+
+  The following binary components are provided to the 'Public Domain'.  See 
project link for details.
+
+      (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
new file mode 100644
index 0000000..c62268b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hive-bundle</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hive3-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <!-- Need to override hadoop.version here, for Hive and hadoop-client 
transitive dependencies -->
+        <hadoop.version>${hive3.hadoop.version}</hadoop.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hive-services-api</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+            <version>${hive3.version}</version>
+            <exclusions>
+                <exclusion>
+                        <groupId>org.json</groupId>
+                        <artifactId>json</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-streaming</artifactId>
+            <version>${hive3.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <version>${hive3.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.stephenc.findbugs</groupId>
+            <artifactId>findbugs-annotations</artifactId>
+            <version>1.3.9-1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
new file mode 100644
index 0000000..7231421
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.MemoryManager;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.MemoryManagerImpl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+
+/**
+ * Utility methods for ORC support (conversion from Avro, conversion to Hive 
types, e.g.
+ */
+public class NiFiOrcUtils {
+
+    public static Object convertToORCObject(TypeInfo typeInfo, Object o, final 
boolean hiveFieldNames) {
+        if (o != null) {
+            if (typeInfo instanceof UnionTypeInfo) {
+                OrcUnion union = new OrcUnion();
+                // Avro uses Utf8 and GenericData.EnumSymbol objects instead 
of Strings. This is handled in other places in the method, but here
+                // we need to determine the union types from the objects, so 
choose String.class if the object is one of those Avro classes
+                Class clazzToCompareTo = o.getClass();
+                if (o instanceof org.apache.avro.util.Utf8 || o instanceof 
GenericData.EnumSymbol) {
+                    clazzToCompareTo = String.class;
+                }
+                // Need to find which of the union types correspond to the 
primitive object
+                TypeInfo objectTypeInfo = 
TypeInfoUtils.getTypeInfoFromObjectInspector(
+                        
ObjectInspectorFactory.getReflectionObjectInspector(clazzToCompareTo, 
ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
+                List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) 
typeInfo).getAllUnionObjectTypeInfos();
+
+                int index = 0;
+                while (index < unionTypeInfos.size() && 
!unionTypeInfos.get(index).equals(objectTypeInfo)) {
+                    index++;
+                }
+                if (index < unionTypeInfos.size()) {
+                    union.set((byte) index, convertToORCObject(objectTypeInfo, 
o, hiveFieldNames));
+                } else {
+                    throw new IllegalArgumentException("Object Type for class 
" + o.getClass().getName() + " not in Union declaration");
+                }
+                return union;
+            }
+            if (o instanceof Integer) {
+                return new IntWritable((int) o);
+            }
+            if (o instanceof Boolean) {
+                return new BooleanWritable((boolean) o);
+            }
+            if (o instanceof Long) {
+                return new LongWritable((long) o);
+            }
+            if (o instanceof Float) {
+                return new FloatWritable((float) o);
+            }
+            if (o instanceof Double) {
+                return new DoubleWritable((double) o);
+            }
+            if (o instanceof String || o instanceof Utf8 || o instanceof 
GenericData.EnumSymbol) {
+                return new Text(o.toString());
+            }
+            if (o instanceof ByteBuffer) {
+                return new BytesWritable(((ByteBuffer) o).array());
+            }
+            if (o instanceof Timestamp) {
+                return new TimestampWritable((Timestamp) o);
+            }
+            if (o instanceof Date) {
+                return new DateWritable((Date) o);
+            }
+            if (o instanceof Object[]) {
+                Object[] objArray = (Object[]) o;
+                TypeInfo listTypeInfo = ((ListTypeInfo) 
typeInfo).getListElementTypeInfo();
+                return Arrays.stream(objArray)
+                        .map(o1 -> convertToORCObject(listTypeInfo, o1, 
hiveFieldNames))
+                        .collect(Collectors.toList());
+            }
+            if (o instanceof int[]) {
+                int[] intArray = (int[]) o;
+                return Arrays.stream(intArray)
+                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element, 
hiveFieldNames))
+                        .collect(Collectors.toList());
+            }
+            if (o instanceof long[]) {
+                long[] longArray = (long[]) o;
+                return Arrays.stream(longArray)
+                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element, 
hiveFieldNames))
+                        .collect(Collectors.toList());
+            }
+            if (o instanceof float[]) {
+                float[] floatArray = (float[]) o;
+                return IntStream.range(0, floatArray.length)
+                        .mapToDouble(i -> floatArray[i])
+                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float) 
element, hiveFieldNames))
+                        .collect(Collectors.toList());
+            }
+            if (o instanceof double[]) {
+                double[] doubleArray = (double[]) o;
+                return Arrays.stream(doubleArray)
+                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element, 
hiveFieldNames))
+                        .collect(Collectors.toList());
+            }
+            if (o instanceof boolean[]) {
+                boolean[] booleanArray = (boolean[]) o;
+                return IntStream.range(0, booleanArray.length)
+                        .map(i -> booleanArray[i] ? 1 : 0)
+                        .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 
1, hiveFieldNames))
+                        .collect(Collectors.toList());
+            }
+            if (o instanceof GenericData.Array) {
+                GenericData.Array array = ((GenericData.Array) o);
+                // The type information in this case is interpreted as a List
+                TypeInfo listTypeInfo = ((ListTypeInfo) 
typeInfo).getListElementTypeInfo();
+                return array.stream().map((element) -> 
convertToORCObject(listTypeInfo, element, 
hiveFieldNames)).collect(Collectors.toList());
+            }
+            if (o instanceof List) {
+                return o;
+            }
+            if (o instanceof Map) {
+                Map map = new HashMap();
+                TypeInfo keyInfo = ((MapTypeInfo) 
typeInfo).getMapKeyTypeInfo();
+                TypeInfo valueInfo = ((MapTypeInfo) 
typeInfo).getMapValueTypeInfo();
+                // Unions are not allowed as key/value types, so if we convert 
the key and value objects,
+                // they should return Writable objects
+                ((Map) o).forEach((key, value) -> {
+                    Object keyObject = convertToORCObject(keyInfo, key, 
hiveFieldNames);
+                    Object valueObject = convertToORCObject(valueInfo, value, 
hiveFieldNames);
+                    if (keyObject == null) {
+                        throw new IllegalArgumentException("Maps' key cannot 
be null");
+                    }
+                    map.put(keyObject, valueObject);
+                });
+                return map;
+            }
+            if (o instanceof GenericData.Record) {
+                GenericData.Record record = (GenericData.Record) o;
+                TypeInfo recordSchema = 
NiFiOrcUtils.getOrcField(record.getSchema(), hiveFieldNames);
+                List<Schema.Field> recordFields = 
record.getSchema().getFields();
+                if (recordFields != null) {
+                    Object[] fieldObjects = new Object[recordFields.size()];
+                    for (int i = 0; i < recordFields.size(); i++) {
+                        Schema.Field field = recordFields.get(i);
+                        Schema fieldSchema = field.schema();
+                        Object fieldObject = record.get(field.name());
+                        fieldObjects[i] = 
NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldSchema, 
hiveFieldNames), fieldObject, hiveFieldNames);
+                    }
+                    return NiFiOrcUtils.createOrcStruct(recordSchema, 
fieldObjects);
+                }
+            }
+            throw new IllegalArgumentException("Error converting object of 
type " + o.getClass().getName() + " to ORC type " + typeInfo.getTypeName());
+        } else {
+            return null;
+        }
+    }
+
+
+    /**
+     * Create an object of OrcStruct given a TypeInfo and a list of objects
+     *
+     * @param typeInfo The TypeInfo object representing the ORC record schema
+     * @param objs     ORC objects/Writables
+     * @return an OrcStruct containing the specified objects for the specified 
schema
+     */
+    @SuppressWarnings("unchecked")
+    public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) 
{
+        SettableStructObjectInspector oi = (SettableStructObjectInspector) 
OrcStruct
+                .createObjectInspector(typeInfo);
+        List<StructField> fields = (List<StructField>) 
oi.getAllStructFieldRefs();
+        OrcStruct result = (OrcStruct) oi.create();
+        result.setNumFields(fields.size());
+        for (int i = 0; i < fields.size(); i++) {
+            oi.setStructFieldData(result, fields.get(i), objs[i]);
+        }
+        return result;
+    }
+
+    public static String normalizeHiveTableName(String name) {
+        return name.replaceAll("[\\. ]", "_");
+    }
+
+    public static String generateHiveDDL(Schema avroSchema, String tableName, 
boolean hiveFieldNames) {
+        Schema.Type schemaType = avroSchema.getType();
+        StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT 
EXISTS ");
+        sb.append(tableName);
+        sb.append(" (");
+        if (Schema.Type.RECORD.equals(schemaType)) {
+            List<String> hiveColumns = new ArrayList<>();
+            List<Schema.Field> fields = avroSchema.getFields();
+            if (fields != null) {
+                hiveColumns.addAll(
+                        fields.stream().map(field -> (hiveFieldNames ? 
field.name().toLowerCase() : field.name()) + " "
+                                + getHiveTypeFromAvroType(field.schema(), 
hiveFieldNames)).collect(Collectors.toList()));
+            }
+            sb.append(StringUtils.join(hiveColumns, ", "));
+            sb.append(") STORED AS ORC");
+            return sb.toString();
+        } else {
+            throw new IllegalArgumentException("Avro schema is of type " + 
schemaType.getName() + ", not RECORD");
+        }
+    }
+
+
+    public static TypeInfo getOrcField(Schema fieldSchema, boolean 
hiveFieldNames) throws IllegalArgumentException {
+        Schema.Type fieldType = fieldSchema.getType();
+        LogicalType logicalType = fieldSchema.getLogicalType();
+
+        switch (fieldType) {
+            case INT:
+            case LONG:
+                // Handle logical types
+                if (logicalType != null) {
+                    if (LogicalTypes.date().equals(logicalType)) {
+                        return TypeInfoFactory.dateTypeInfo;
+                    } else if (LogicalTypes.timeMicros().equals(logicalType)) {
+                        // Time micros isn't supported by our Record Field 
types (see AvroTypeUtil)
+                        throw new IllegalArgumentException("time-micros is not 
a supported field type");
+                    } else if (LogicalTypes.timeMillis().equals(logicalType)) {
+                        return TypeInfoFactory.intTypeInfo;
+                    } else if 
(LogicalTypes.timestampMicros().equals(logicalType)) {
+                        // Timestamp micros isn't supported by our Record 
Field types (see AvroTypeUtil)
+                        throw new IllegalArgumentException("timestamp-micros 
is not a supported field type");
+                    } else if 
(LogicalTypes.timestampMillis().equals(logicalType)) {
+                        return TypeInfoFactory.timestampTypeInfo;
+                    }
+                }
+                return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
+            case BYTES:
+                // Handle logical types
+                if (logicalType != null) {
+                    if (logicalType instanceof LogicalTypes.Decimal) {
+                        return TypeInfoFactory.doubleTypeInfo;
+                    }
+                }
+                return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
+
+            case BOOLEAN:
+            case DOUBLE:
+            case FLOAT:
+            case STRING:
+                return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
+
+            case UNION:
+                List<Schema> unionFieldSchemas = fieldSchema.getTypes();
+
+                if (unionFieldSchemas != null) {
+                    // Ignore null types in union
+                    List<TypeInfo> orcFields = 
unionFieldSchemas.stream().filter(
+                            unionFieldSchema -> 
!Schema.Type.NULL.equals(unionFieldSchema.getType()))
+                            .map((it) -> NiFiOrcUtils.getOrcField(it, 
hiveFieldNames))
+                            .collect(Collectors.toList());
+
+                    // Flatten the field if the union only has one non-null 
element
+                    if (orcFields.size() == 1) {
+                        return orcFields.get(0);
+                    } else {
+                        return TypeInfoFactory.getUnionTypeInfo(orcFields);
+                    }
+                }
+                return null;
+
+            case ARRAY:
+                return 
TypeInfoFactory.getListTypeInfo(getOrcField(fieldSchema.getElementType(), 
hiveFieldNames));
+
+            case MAP:
+                return TypeInfoFactory.getMapTypeInfo(
+                        
getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING),
+                        getOrcField(fieldSchema.getValueType(), 
hiveFieldNames));
+
+            case RECORD:
+                List<Schema.Field> avroFields = fieldSchema.getFields();
+                if (avroFields != null) {
+                    List<String> orcFieldNames = new 
ArrayList<>(avroFields.size());
+                    List<TypeInfo> orcFields = new 
ArrayList<>(avroFields.size());
+                    avroFields.forEach(avroField -> {
+                        String fieldName = hiveFieldNames ? 
avroField.name().toLowerCase() : avroField.name();
+                        orcFieldNames.add(fieldName);
+                        orcFields.add(getOrcField(avroField.schema(), 
hiveFieldNames));
+                    });
+                    return TypeInfoFactory.getStructTypeInfo(orcFieldNames, 
orcFields);
+                }
+                return null;
+
+            case ENUM:
+                // An enum value is just a String for ORC/Hive
+                return 
getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING);
+
+            default:
+                throw new IllegalArgumentException("Did not recognize Avro 
type " + fieldType.getName());
+        }
+
+    }
+
+    public static Schema.Type getAvroSchemaTypeOfObject(Object o) {
+        if (o == null) {
+            return Schema.Type.NULL;
+        } else if (o instanceof Integer) {
+            return Schema.Type.INT;
+        } else if (o instanceof Long) {
+            return Schema.Type.LONG;
+        } else if (o instanceof Boolean) {
+            return Schema.Type.BOOLEAN;
+        } else if (o instanceof byte[]) {
+            return Schema.Type.BYTES;
+        } else if (o instanceof Float) {
+            return Schema.Type.FLOAT;
+        } else if (o instanceof Double) {
+            return Schema.Type.DOUBLE;
+        } else if (o instanceof Enum) {
+            return Schema.Type.ENUM;
+        } else if (o instanceof Object[]) {
+            return Schema.Type.ARRAY;
+        } else if (o instanceof List) {
+            return Schema.Type.ARRAY;
+        } else if (o instanceof Map) {
+            return Schema.Type.MAP;
+        } else {
+            throw new IllegalArgumentException("Object of class " + 
o.getClass() + " is not a supported Avro Type");
+        }
+    }
+
+    public static TypeInfo 
getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type avroType) throws 
IllegalArgumentException {
+        if (avroType == null) {
+            throw new IllegalArgumentException("Avro type is null");
+        }
+        switch (avroType) {
+            case INT:
+                return TypeInfoFactory.getPrimitiveTypeInfo("int");
+            case LONG:
+                return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
+            case BOOLEAN:
+                return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
+            case BYTES:
+                return TypeInfoFactory.getPrimitiveTypeInfo("binary");
+            case DOUBLE:
+                return TypeInfoFactory.getPrimitiveTypeInfo("double");
+            case FLOAT:
+                return TypeInfoFactory.getPrimitiveTypeInfo("float");
+            case STRING:
+                return TypeInfoFactory.getPrimitiveTypeInfo("string");
+            default:
+                throw new IllegalArgumentException("Avro type " + 
avroType.getName() + " is not a primitive type");
+        }
+    }
+
+    public static String getHiveTypeFromAvroType(Schema avroSchema, boolean 
hiveFieldNames) {
+        if (avroSchema == null) {
+            throw new IllegalArgumentException("Avro schema is null");
+        }
+
+        Schema.Type avroType = avroSchema.getType();
+        LogicalType logicalType = avroSchema.getLogicalType();
+
+        switch (avroType) {
+            case INT:
+                if (logicalType != null) {
+                    if (LogicalTypes.date().equals(logicalType)) {
+                        return "DATE";
+                    }
+                    // Time-millis has no current corresponding Hive type, 
perhaps an INTERVAL type when that is fully supported.
+                }
+                return "INT";
+            case LONG:
+                if (logicalType != null) {
+                    if (LogicalTypes.timestampMillis().equals(logicalType)) {
+                        return "TIMESTAMP";
+                    }
+                    // Timestamp-micros and time-micros are not supported by 
our Record Field type system
+                }
+                return "BIGINT";
+            case BOOLEAN:
+                return "BOOLEAN";
+            case BYTES:
+                if (logicalType != null) {
+                    if (logicalType instanceof LogicalTypes.Decimal) {
+                        return "DOUBLE";
+                    }
+                }
+                return "BINARY";
+            case DOUBLE:
+                return "DOUBLE";
+            case FLOAT:
+                return "FLOAT";
+            case STRING:
+            case ENUM:
+                return "STRING";
+            case UNION:
+                List<Schema> unionFieldSchemas = avroSchema.getTypes();
+                if (unionFieldSchemas != null) {
+                    List<String> hiveFields = new ArrayList<>();
+                    for (Schema unionFieldSchema : unionFieldSchemas) {
+                        Schema.Type unionFieldSchemaType = 
unionFieldSchema.getType();
+                        // Ignore null types in union
+                        if (!Schema.Type.NULL.equals(unionFieldSchemaType)) {
+                            
hiveFields.add(getHiveTypeFromAvroType(unionFieldSchema, hiveFieldNames));
+                        }
+                    }
+                    // Flatten the field if the union only has one non-null 
element
+                    return (hiveFields.size() == 1)
+                            ? hiveFields.get(0)
+                            : "UNIONTYPE<" + StringUtils.join(hiveFields, ", 
") + ">";
+
+                }
+                break;
+            case MAP:
+                return "MAP<STRING, " + 
getHiveTypeFromAvroType(avroSchema.getValueType(), hiveFieldNames) + ">";
+            case ARRAY:
+                return "ARRAY<" + 
getHiveTypeFromAvroType(avroSchema.getElementType(), hiveFieldNames) + ">";
+            case RECORD:
+                List<Schema.Field> recordFields = avroSchema.getFields();
+                if (recordFields != null) {
+                    List<String> hiveFields = recordFields.stream().map(
+                            recordField -> (hiveFieldNames ? 
recordField.name().toLowerCase() : recordField.name()) + ":"
+                                    + 
getHiveTypeFromAvroType(recordField.schema(), 
hiveFieldNames)).collect(Collectors.toList());
+                    return "STRUCT<" + StringUtils.join(hiveFields, ", ") + 
">";
+                }
+                break;
+            default:
+                break;
+        }
+
+        throw new IllegalArgumentException("Error converting Avro type " + 
avroType.getName() + " to Hive type");
+    }
+
+
+    public static Writer createWriter(
+            Path path,
+            Configuration conf,
+            TypeInfo orcSchema,
+            long stripeSize,
+            CompressionKind compress,
+            int bufferSize) throws IOException {
+
+        int rowIndexStride = (int) OrcConf.ROW_INDEX_STRIDE.getLong(conf);
+
+        boolean addBlockPadding = OrcConf.BLOCK_PADDING.getBoolean(conf);
+
+        String versionName = OrcConf.WRITE_FORMAT.getString(conf);
+        OrcFile.Version versionValue = (versionName == null)
+                ? OrcFile.Version.CURRENT
+                : OrcFile.Version.byName(versionName);
+
+        OrcFile.EncodingStrategy encodingStrategy;
+        String enString = OrcConf.ENCODING_STRATEGY.getString(conf);
+        if (enString == null) {
+            encodingStrategy = OrcFile.EncodingStrategy.SPEED;
+        } else {
+            encodingStrategy = OrcFile.EncodingStrategy.valueOf(enString);
+        }
+
+        final double paddingTolerance = 
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf);
+
+        long blockSizeValue = OrcConf.BLOCK_SIZE.getLong(conf);
+
+        double bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(conf);
+
+        ObjectInspector inspector = OrcStruct.createObjectInspector(orcSchema);
+
+        OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf)
+                .rowIndexStride(rowIndexStride)
+                .blockPadding(addBlockPadding)
+                .version(versionValue)
+                .encodingStrategy(encodingStrategy)
+                .paddingTolerance(paddingTolerance)
+                .blockSize(blockSizeValue)
+                .bloomFilterFpp(bloomFilterFpp)
+                .memory(getMemoryManager(conf))
+                .inspector(inspector)
+                .stripeSize(stripeSize)
+                .bufferSize(bufferSize)
+                .compress(compress);
+
+        return OrcFile.createWriter(path, writerOptions);
+    }
+
+    private static MemoryManager memoryManager = null;
+
+    private static synchronized MemoryManager getMemoryManager(Configuration 
conf) {
+        if (memoryManager == null) {
+            memoryManager = new MemoryManagerImpl(conf);
+        }
+        return memoryManager;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
new file mode 100644
index 0000000..6edb374
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/HiveRecordWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+public class HiveRecordWriter extends AbstractRecordWriter {
+
+    private RecordReader recordReader;
+    private NiFiRecordSerDe serde;
+    private ComponentLog log;
+
+    public HiveRecordWriter(RecordReader recordReader, ComponentLog log) {
+        super(null);
+        this.recordReader = recordReader;
+        this.log = log;
+    }
+
+    @Override
+    public AbstractSerDe createSerde() throws SerializationError {
+        try {
+            Properties tableProps = table.getMetadata();
+            tableProps.setProperty(serdeConstants.LIST_COLUMNS, 
Joiner.on(",").join(inputColumns));
+            tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, 
Joiner.on(":").join(inputTypes));
+            NiFiRecordSerDe serde = new NiFiRecordSerDe(recordReader, log);
+            SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+            this.serde = serde;
+            return serde;
+        } catch (SerDeException e) {
+            throw new SerializationError("Error initializing serde " + 
NiFiRecordSerDe.class.getName(), e);
+        }
+    }
+
+    @Override
+    public Object encode(byte[] bytes) {
+        throw new UnsupportedOperationException(this.getClass().getName() + " 
does not support encoding of records via bytes, only via an InputStream");
+    }
+
+    @Override
+    public void write(long writeId, byte[] record) {
+        throw new UnsupportedOperationException(this.getClass().getName() + " 
does not support writing of records via bytes, only via an InputStream");
+    }
+
+    @Override
+    public void write(long writeId, InputStream inputStream) throws 
StreamingException {
+        // The inputStream is already available to the recordReader, so just 
iterate through the records
+        try {
+            Record record;
+            while ((record = recordReader.nextRecord()) != null) {
+                write(writeId, record);
+            }
+        } catch (MalformedRecordException | IOException e) {
+            throw new StreamingException(e.getLocalizedMessage(), e);
+        }
+    }
+
+    public Object encode(Record record) throws SerializationError {
+        try {
+            ObjectWritable blob = new ObjectWritable(record);
+            return serde.deserialize(blob);
+        } catch (SerDeException e) {
+            throw new SerializationError("Unable to convert Record into 
Object", e);
+        }
+    }
+
+    private void write(long writeId, Record record) throws StreamingException {
+        checkAutoFlush();
+        try {
+            Object encodedRow = encode(record);
+            int bucket = getBucket(encodedRow);
+            List<String> partitionValues = getPartitionValues(encodedRow);
+            getRecordUpdater(partitionValues, bucket).insert(writeId, 
encodedRow);
+            conn.getConnectionStats().incrementRecordsWritten();
+        } catch (IOException e) {
+            throw new StreamingIOFailure("Error writing record in transaction 
write id (" + writeId + ")", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
new file mode 100644
index 0000000..d4b444a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+    protected RecordReader recordReader;
+    protected ComponentLog log;
+    protected List<String> columnNames;
+    protected StructTypeInfo schema;
+    protected SerDeStats stats;
+
+    protected StandardStructObjectInspector cachedObjectInspector;
+    protected TimestampParser tsParser;
+
+    private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
+
+    private Map<String, Integer> fieldPositionMap;
+
+    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+        this.recordReader = recordReader;
+        this.log = log;
+    }
+
+    @Override
+    public void initialize(Configuration conf, Properties tbl) throws 
SerDeException {
+        List<TypeInfo> columnTypes;
+        StructTypeInfo rowTypeInfo;
+
+        log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
+
+        // Get column names and types
+        String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+        String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+        final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+        // all table column names
+        if (columnNameProperty.isEmpty()) {
+            columnNames = new ArrayList<>(0);
+        } else {
+            columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+        }
+
+        // all column types
+        if (columnTypeProperty.isEmpty()) {
+            columnTypes = new ArrayList<>(0);
+        } else {
+            columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+        }
+
+        log.debug("columns: {}, {}", new Object[]{columnNameProperty, 
columnNames});
+        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, 
columnTypes});
+
+        assert (columnNames.size() == columnTypes.size());
+
+        rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+        schema = rowTypeInfo;
+        log.debug("schema : {}", new Object[]{schema});
+        cachedObjectInspector = (StandardStructObjectInspector) 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
+        tsParser = new 
TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
+        // Populate mapping of field names to column positions
+        try {
+            populateFieldPositionMap();
+        } catch (MalformedRecordException | IOException e) {
+            throw new SerDeException(e);
+        }
+        stats = new SerDeStats();
+    }
+
+    @Override
+    public Class<? extends Writable> getSerializedClass() {
+        return ObjectWritable.class;
+    }
+
+    @Override
+    public Writable serialize(Object o, ObjectInspector objectInspector) 
throws SerDeException {
+        throw new UnsupportedOperationException("This SerDe only supports 
deserialization");
+    }
+
+    @Override
+    public SerDeStats getSerDeStats() {
+        return stats;
+    }
+
+    @Override
+    public Object deserialize(Writable writable) throws SerDeException {
+        ObjectWritable t = (ObjectWritable) writable;
+        Record record = (Record) t.get();
+        List<Object> r = new 
ArrayList<>(Collections.nCopies(columnNames.size(), null));
+        try {
+            RecordSchema recordSchema = record.getSchema();
+            for (RecordField field : recordSchema.getFields()) {
+                String fieldName = field.getFieldName();
+                String normalizedFieldName = fieldName.toLowerCase();
+
+                // Get column position of field name, and set field value there
+                Integer fpos = fieldPositionMap.get(normalizedFieldName);
+                if(fpos == null || fpos == -1) {
+                    // This is either a partition column or not a column in 
the target table, ignore either way
+                    continue;
+                }
+                Object currField = extractCurrentField(record, field, 
schema.getStructFieldTypeInfo(normalizedFieldName));
+                r.set(fpos, currField);
+            }
+            stats.setRowCount(stats.getRowCount() + 1);
+
+        } catch (Exception e) {
+            log.warn("Error [{}] parsing Record [{}].", new 
Object[]{e.getLocalizedMessage(), t}, e);
+            throw new SerDeException(e);
+        }
+
+        return r;
+    }
+
+    /**
+     * Utility method to extract current expected field from given JsonParser
+     * isTokenCurrent is a boolean variable also passed in, which determines
+     * if the JsonParser is already at the token we expect to read next, or
+     * needs advancing to the next before we read.
+     */
+    private Object extractCurrentField(Record record, RecordField field, 
TypeInfo fieldTypeInfo) {
+        Object val;
+        String fieldName = (field != null) ? field.getFieldName() : null;
+
+        switch (fieldTypeInfo.getCategory()) {
+            case PRIMITIVE:
+                PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = 
PrimitiveObjectInspector.PrimitiveCategory.UNKNOWN;
+                if (fieldTypeInfo instanceof PrimitiveTypeInfo) {
+                    primitiveCategory = ((PrimitiveTypeInfo) 
fieldTypeInfo).getPrimitiveCategory();
+                }
+                switch (primitiveCategory) {
+                    case INT:
+                    case BYTE:
+                    case SHORT:
+                        val = record.getAsInt(fieldName);
+                        break;
+                    case LONG:
+                        val = record.getAsLong(fieldName);
+                        break;
+                    case BOOLEAN:
+                        val = record.getAsBoolean(fieldName);
+                        break;
+                    case FLOAT:
+                        val = record.getAsFloat(fieldName);
+                        break;
+                    case DOUBLE:
+                        val = record.getAsDouble(fieldName);
+                        break;
+                    case STRING:
+                    case VARCHAR:
+                    case CHAR:
+                        val = record.getAsString(fieldName);
+                        break;
+                    case BINARY:
+                        val = 
AvroTypeUtil.convertByteArray(record.getAsArray(fieldName)).array();
+                        break;
+                    case DATE:
+                        val = record.getAsDate(fieldName, 
field.getDataType().getFormat());
+                        break;
+                    case TIMESTAMP:
+                        val = 
DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> 
DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
+                        break;
+                    case DECIMAL:
+                        val = record.getAsDouble(fieldName);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Field " + 
fieldName + " cannot be converted to unknown type: " + 
primitiveCategory.name());
+                }
+                break;
+            case LIST:
+                val = Arrays.asList(record.getAsArray(fieldName));
+                break;
+            case MAP:
+                val = 
DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), 
field.getDataType());
+                break;
+            case STRUCT:
+                val = 
DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), 
field.getDataType());
+                break;
+            default:
+                log.error("Unknown type found: " + fieldTypeInfo + "for field 
of type: " + field.getDataType().toString());
+                return null;
+        }
+        return val;
+    }
+
+    @Override
+    public ObjectInspector getObjectInspector() {
+        return cachedObjectInspector;
+    }
+
+    private void populateFieldPositionMap() throws MalformedRecordException, 
IOException {
+        // Populate the mapping of field names to column positions only once
+        fieldPositionMap = new HashMap<>(columnNames.size());
+
+        RecordSchema recordSchema = recordReader.getSchema();
+        for (RecordField field : recordSchema.getFields()) {
+            String fieldName = field.getFieldName();
+            String normalizedFieldName = fieldName.toLowerCase();
+
+            int fpos = 
schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
+            if (fpos == -1) {
+                Matcher m = INTERNAL_PATTERN.matcher(fieldName);
+                fpos = m.matches() ? Integer.parseInt(m.group(1)) : -1;
+
+                log.debug("NPE finding position for field [{}] in schema [{}],"
+                        + " attempting to check if it is an internal column 
name like _col0", new Object[]{fieldName, schema});
+                if (fpos == -1) {
+                    // unknown field, we return. We'll continue from the next 
field onwards. Log at debug level because partition columns will be "unknown 
fields"
+                    log.debug("Field {} is not found in the target table, 
ignoring...", new Object[]{field.getFieldName()});
+                    continue;
+                }
+                // If we get past this, then the column name did match the 
hive pattern for an internal
+                // column name, such as _col0, etc, so it *MUST* match the 
schema for the appropriate column.
+                // This means people can't use arbitrary column names such as 
_col0, and expect us to ignore it
+                // if we find it.
+                if 
(!fieldName.equalsIgnoreCase(HiveConf.getColumnInternalName(fpos))) {
+                    log.error("Hive internal column name {} and position "
+                            + "encoding {} for the column name are at odds", 
new Object[]{fieldName, fpos});
+                    throw new IOException("Hive internal column name (" + 
fieldName
+                            + ") and position encoding (" + fpos
+                            + ") for the column name are at odds");
+                }
+                // If we reached here, then we were successful at finding an 
alternate internal
+                // column mapping, and we're about to proceed.
+            }
+            fieldPositionMap.put(normalizedFieldName, fpos);
+        }
+    }
+}

Reply via email to