NIFI-981: Added SelectHiveQL and PutHiveQL processors

This closes #384.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/59bc8264
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/59bc8264
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/59bc8264

Branch: refs/heads/0.x
Commit: 59bc8264749465af224eb003786ec4bd504f10da
Parents: 9fdb315
Author: Matt Burgess <mattyb...@apache.org>
Authored: Tue May 3 13:11:54 2016 -0400
Committer: Bryan Bende <bbe...@apache.org>
Committed: Tue May 3 13:41:44 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   5 +
 .../src/main/asciidoc/getting-started.adoc      |   2 +
 .../nifi-hive-bundle/nifi-hive-nar/pom.xml      |  46 ++
 .../src/main/resources/META-INF/NOTICE          | 330 +++++++++++
 .../nifi-hive-processors/pom.xml                | 165 ++++++
 .../nifi/dbcp/hive/HiveConnectionPool.java      | 353 ++++++++++++
 .../apache/nifi/dbcp/hive/HiveDBCPService.java  |  32 ++
 .../hive/AbstractHiveQLProcessor.java           |  35 ++
 .../apache/nifi/processors/hive/PutHiveQL.java  | 314 +++++++++++
 .../nifi/processors/hive/SelectHiveQL.java      | 229 ++++++++
 .../apache/nifi/util/hive/HiveJdbcCommon.java   | 330 +++++++++++
 ...org.apache.nifi.controller.ControllerService |  15 +
 .../org.apache.nifi.processor.Processor         |  16 +
 .../nifi/processors/hive/TestPutHiveQL.java     | 560 +++++++++++++++++++
 .../nifi/processors/hive/TestSelectHiveQL.java  | 294 ++++++++++
 nifi-nar-bundles/nifi-hive-bundle/pom.xml       |  35 ++
 nifi-nar-bundles/pom.xml                        |   5 +-
 pom.xml                                         |   6 +
 18 files changed, 2770 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index e697fe6..5820435 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -307,6 +307,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-spring-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hive-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 
     <properties>

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-docs/src/main/asciidoc/getting-started.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/getting-started.adoc 
b/nifi-docs/src/main/asciidoc/getting-started.adoc
index 7c51888..a68f171 100644
--- a/nifi-docs/src/main/asciidoc/getting-started.adoc
+++ b/nifi-docs/src/main/asciidoc/getting-started.adoc
@@ -307,6 +307,8 @@ categorizing them by their functions.
 - *ConvertJSONToSQL*: Convert a JSON document into a SQL INSERT or UPDATE 
command that can then be passed to the PutSQL Processor
 - *ExecuteSQL*: Executes a user-defined SQL SELECT command, writing the 
results to a FlowFile in Avro format
 - *PutSQL*: Updates a database by executing the SQL DDM statement defined by 
the FlowFile's content
+- *SelectHiveQL*: Executes a user-defined HiveQL SELECT command against an 
Apache Hive database, writing the results to a FlowFile in Avro or CSV format
+- *PutHiveQL*: Updates a Hive database by executing the HiveQL DDM statement 
defined by the FlowFile's content
 
 [[AttributeExtraction]]
 === Attribute Extraction

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
new file mode 100644
index 0000000..d83f053
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hive-bundle</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hive-nar</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hive-processors</artifactId>
+            <version>0.7.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-libraries-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..35422bb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,330 @@
+nifi-hive-nar
+Copyright 2014-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Ant
+    The following NOTICE information applies:
+      Apache Ant
+      Copyright 1999-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Codec
+      The following NOTICE information applies:
+        Apache Commons Codec
+        Copyright 2002-2014 The Apache Software Foundation
+
+        src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+        contains test data from http://aspell.net/test/orig/batch0.tab.
+        Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org)
+
+        
===============================================================================
+
+        The content of package org.apache.commons.codec.language.bm has been 
translated
+        from the original php source code available at 
http://stevemorse.org/phoneticinfo.htm
+        with permission from the original authors.
+        Original source copyright:
+        Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons DBCP
+    The following NOTICE information applies:
+      Apache Commons DBCP
+      Copyright 2001-2015 The Apache Software Foundation.
+
+  (ASLv2) Apache Commons EL
+    The following NOTICE information applies:
+      Apache Commons EL
+      Copyright 1999-2016 The Apache Software Foundation
+
+      EL-8 patch - Copyright 2004-2007 Jamie Taylor
+      http://issues.apache.org/jira/browse/EL-8
+
+  (ASLv2) Apache HttpComponents
+      The following NOTICE information applies:
+        Apache HttpComponents Client
+        Copyright 1999-2016 The Apache Software Foundation
+        Apache HttpComponents Core - HttpCore
+        Copyright 2006-2009 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Logging
+    The following NOTICE information applies:
+      Apache Commons Logging
+      Copyright 2003-2014 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Pool
+    The following NOTICE information applies:
+      Apache Commons Pool
+      Copyright 1999-2009 The Apache Software Foundation.
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2012 The Apache Software Foundation
+
+  (ASLv2) Apache Hive
+    The following NOTICE information applies:
+      Apache Hive
+      Copyright 2008-2015 The Apache Software Foundation
+
+      This product includes software developed by The Apache Software
+      Foundation (http://www.apache.org/).
+
+      This product includes Jersey (https://jersey.java.net/)
+      Copyright (c) 2010-2014 Oracle and/or its affiliates.
+
+      This project includes software copyrighted by Microsoft Corporation and
+      licensed under the Apache License, Version 2.0.
+
+      This project includes software copyrighted by Dell SecureWorks and
+      licensed under the Apache License, Version 2.0.
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+       ## Credits
+
+       A list of contributors may be found from CREDITS file, which is included
+       in some artifacts (usually source distributions); but is always 
available
+       from the source code management (SCM) system project uses.
+
+  (ASLv2) BoneCP
+    The following NOTICE information applies:
+       BoneCP
+       Copyright 2010 Wallace Wadge
+
+  (ASLv2) Apache Hadoop
+    The following NOTICE information applies:
+      The binary distribution of this product bundles binaries of
+      org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which 
has the
+      following notices:
+      * Copyright 2011 Dain Sundstrom <d...@iq80.com>
+      * Copyright 2011 FuseSource Corp. http://fusesource.com
+
+      The binary distribution of this product bundles binaries of
+      org.fusesource.hawtjni:hawtjni-runtime 
(https://github.com/fusesource/hawtjni),
+      which has the following notices:
+      * This product includes software developed by FuseSource Corp.
+        http://fusesource.com
+      * This product includes software developed at
+        Progress Software Corporation and/or its  subsidiaries or affiliates.
+      * This product includes software developed by IBM Corporation and others.
+
+  (ASLv2) Apache HBase
+    The following NOTICE information applies:
+      Apache HBase
+      Copyright 2007-2015 The Apache Software Foundation
+
+      --
+      This product incorporates portions of the 'Hadoop' project
+
+      Copyright 2007-2009 The Apache Software Foundation
+
+      Licensed under the Apache License v2.0
+      --
+      Our Orca logo we got here: http://www.vectorfree.com/jumping-orca
+      It is licensed Creative Commons Attribution 3.0.
+      See https://creativecommons.org/licenses/by/3.0/us/
+      We changed the logo by stripping the colored background, inverting
+      it and then rotating it some.
+
+      Later we found that vectorfree.com image is not properly licensed.
+      The original is owned by vectorportal.com. The original was
+      relicensed so we could use it as Creative Commons Attribution 3.0.
+      The license is bundled with the download available here:
+      
http://www.vectorportal.com/subcategory/205/KILLER-WHALE-FREE-VECTOR.eps/ifile/9136/detailtest.asp
+      --
+      This product includes portions of the Bootstrap project v3.0.0
+
+      Copyright 2013 Twitter, Inc.
+
+      Licensed under the Apache License v2.0
+
+      This product uses the Glyphicons Halflings icon set.
+
+      http://glyphicons.com/
+
+      Copyright Jan Kovařík
+
+      Licensed under the Apache License v2.0 as a part of the Bootstrap 
project.
+
+      --
+      This product includes portions of the Guava project v14, specifically
+      
'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java'
+
+      Copyright (C) 2007 The Guava Authors
+
+      Licensed under the Apache License, Version 2.0
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+  (ASLv2) Apache Curator
+    The following NOTICE information applies:
+      Apache Curator
+      Copyright 2013-2014 The Apache Software Foundation
+
+  (ASLv2) Apache Derby
+    The following NOTICE information applies:
+      Apache Derby
+      Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, 
Apache JDO, Apache DDLUtils,
+      the Derby hat logo, the Apache JDO logo, and the Apache feather logo are 
trademarks of The Apache Software Foundation.
+
+  (ASLv2) Apache DS
+    The following NOTICE information applies:
+      ApacheDS
+      Copyright 2003-2015 The Apache Software Foundation
+
+  (ASLv2) Apache Geronimo
+    The following NOTICE information applies:
+      Apache Geronimo
+      Copyright 2003-2008 The Apache Software Foundation
+
+  (ASLv2) HTrace Core
+    The following NOTICE information applies:
+      In addition, this product includes software dependencies. See
+      the accompanying LICENSE.txt for a listing of dependencies
+      that are NOT Apache licensed (with pointers to their licensing)
+
+      Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
+      is a distributed tracing system that is Apache 2.0 Licensed.
+      Copyright 2012 Twitter, Inc.
+
+  (ASLv2) Jettison
+    The following NOTICE information applies:
+       Copyright 2006 Envoi Solutions LLC
+
+  (ASLv2) Jetty
+    The following NOTICE information applies:
+       Jetty Web Container
+       Copyright 1995-2015 Mort Bay Consulting Pty Ltd.
+
+  (ASLv2) Apache log4j
+    The following NOTICE information applies:
+      Apache log4j
+      Copyright 2007 The Apache Software Foundation
+
+  (ASLv2) Parquet MR
+    The following NOTICE information applies:
+      Parquet MR
+      Copyright 2012 Twitter, Inc.
+
+      This project includes code from https://github.com/lemire/JavaFastPFOR
+      
parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java
+      Apache License Version 2.0 http://www.apache.org/licenses/.
+      (c) Daniel Lemire, http://lemire.me/en/
+
+  (ASLv2) Apache Thrift
+    The following NOTICE information applies:
+      Apache Thrift
+      Copyright 2006-2010 The Apache Software Foundation.
+
+  (ASLv2) Apache Twill
+    The following NOTICE information applies:
+      Apache Twill
+      Copyright 2013-2016 The Apache Software Foundation
+
+  (ASLv2) Dropwizard Metrics
+    The following NOTICE information applies:
+      Metrics
+      Copyright 2010-2013 Coda Hale and Yammer, Inc.
+
+      This product includes code derived from the JSR-166 project 
(ThreadLocalRandom, Striped64,
+      LongAdder), which was released with the following comments:
+
+          Written by Doug Lea with assistance from members of JCP JSR-166
+          Expert Group and released to the public domain, as explained at
+          http://creativecommons.org/publicdomain/zero/1.0/
+
+  (ASLv2) Joda Time
+      The following NOTICE information applies:
+        This product includes software developed by
+        Joda.org (http://www.joda.org/).
+
+  (ASLv2) The Netty Project
+      The following NOTICE information applies:
+        The Netty Project
+        Copyright 2011 The Netty Project
+
+  (ASLv2) Apache Tomcat
+      The following NOTICE information applies:
+        Apache Tomcat
+        Copyright 2007 The Apache Software Foundation
+
+          Java Management Extensions (JMX) support is provided by
+          the MX4J package, which is open source software.  The
+          original software and related information is available
+          at http://mx4j.sourceforge.net.
+
+          Java compilation software for JSP pages is provided by Eclipse,
+          which is open source software.  The orginal software and
+          related infomation is available at
+          http://www.eclipse.org.
+
+  (ASLv2) Apache ZooKeeper
+     The following NOTICE information applies:
+       Apache ZooKeeper
+       Copyright 2009-2012 The Apache Software Foundation
+
+  (ASLv2) Google GSON
+     The following NOTICE information applies:
+       Copyright 2008 Google Inc.
+
+  (ASLv2) JPam
+    The following NOTICE information applies:
+      Copyright 2003-2006 Greg Luck
+
+  ************************
+  Common Development and Distribution License 1.1
+  ************************
+
+  The following binary components are provided under the Common Development 
and Distribution License 1.1. See project link for details.
+
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-client 
(com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net)
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-core 
(com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-json 
(com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) jersey-server 
(com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding 
(javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/)
+      (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) 
(javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail)
+
+
+  ************************
+  Common Development and Distribution License 1.0
+  ************************
+
+    The following binary components are provided under the Common Development 
and Distribution License 1.0.  See project link for details.
+
+      (CDDL 1.0) JavaServlet(TM) Specification 
(javax.servlet:servlet-api:jar:2.5 - no url available)
+      (CDDL 1.0) (GPL3) Streaming API For XML 
(javax.xml.stream:stax-api:jar:1.0-2 - no url provided)
+      (CDDL 1.0) JavaBeans Activation Framework (JAF) 
(javax.activation:activation:jar:1.1 - 
http://java.sun.com/products/javabeans/jaf/index.jsp)
+      (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - 
http://jsp.java.net)
+      (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - 
https://jsr311.dev.java.net)
+
+  *****************
+  Public Domain
+  *****************
+
+  The following binary components are provided to the 'Public Domain'.  See 
project link for details.
+
+      (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
new file mode 100644
index 0000000..33c4552
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hive-bundle</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hive-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <hive.version>2.0.0</hive.version>
+    </properties>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.xerial.snappy</groupId>
+                    <artifactId>snappy-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.thoughtworks.paranamer</groupId>
+                    <artifactId>paranamer</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-mapper-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-codec</groupId>
+                    <artifactId>commons-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-lang</groupId>
+                    <artifactId>commons-lang</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-cli</groupId>
+                    <artifactId>commons-cli</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-collections</groupId>
+                    <artifactId>commons-collections</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.code.gson</groupId>
+                    <artifactId>gson</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-utils</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
new file mode 100644
index 0000000..07f1469
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp.hive;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.jdbc.HiveDriver;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.KerberosTicketRenewer;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NiFiProperties;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Implementation for Database Connection Pooling Service used for Apache Hive 
connections. Apache DBCP is used for connection pooling functionality.
+ */
+@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
+@CapabilityDescription("Provides Database Connection Pooling Service for 
Apache Hive. Connections can be asked from pool and returned after usage.")
+public class HiveConnectionPool extends AbstractControllerService implements 
HiveDBCPService {
+
+    public static final PropertyDescriptor DATABASE_URL = new 
PropertyDescriptor.Builder()
+            .name("hive-db-connect-url")
+            .displayName("Database Connection URL")
+            .description("A database connection URL used to connect to a 
database. May contain database system name, host, port, database name and some 
parameters."
+                    + " The exact syntax of a database connection URL is 
specified by the Hive documentation. For example, the server principal is often 
included "
+                    + "as a connection parameter when connecting to a secure 
Hive server.")
+            .defaultValue(null)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new 
PropertyDescriptor.Builder()
+            .name("hive-config-resources")
+            .displayName("Hive Configuration Resources")
+            .description("A file or comma separated list of files which 
contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+                    + "will search the classpath for a 'hive-site.xml' file or 
will revert to a default configuration. Note that to enable authentication "
+                    + "with Kerberos e.g., the appropriate properties must be 
set in the configuration files. Please see the Hive documentation for more 
details.")
+            
.required(false).addValidator(createMultipleFilesExistValidator()).build();
+
+    public static final PropertyDescriptor DB_USER = new 
PropertyDescriptor.Builder()
+            .name("hive-db-user")
+            .displayName("Database User")
+            .description("Database user name")
+            .defaultValue(null)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DB_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("hive-db-password")
+            .displayName("Password")
+            .description("The password for the database user")
+            .defaultValue(null)
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_WAIT_TIME = new 
PropertyDescriptor.Builder()
+            .name("hive-max-wait-time")
+            .displayName("Max Wait Time")
+            .description("The maximum amount of time that the pool will wait 
(when there are no available connections) "
+                    + " for a connection to be returned before failing, or -1 
to wait indefinitely. ")
+            .defaultValue("500 millis")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new 
PropertyDescriptor.Builder()
+            .name("hive-max-total-connections")
+            .displayName("Max Total Connections")
+            .description("The maximum number of active connections that can be 
allocated from this pool at the same time, "
+                    + "or negative for no limit.")
+            .defaultValue("8")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    static final long TICKET_RENEWAL_PERIOD = 60000;
+
+    private volatile UserGroupInformation ugi;
+    private volatile KerberosTicketRenewer renewer;
+
+    private final static List<PropertyDescriptor> properties;
+    private static KerberosProperties kerberosProperties;
+
+    private String  connectionUrl = "unknown";
+
+    // Holder of cached Configuration information so validation does not 
reload the same config over and over
+    private final AtomicReference<ValidationResources> 
validationResourceHolder = new AtomicReference<>();
+
+    private volatile BasicDataSource dataSource;
+
+
+    static {
+        kerberosProperties = 
KerberosProperties.create(NiFiProperties.getInstance());
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(DATABASE_URL);
+        props.add(HIVE_CONFIGURATION_RESOURCES);
+        props.add(kerberosProperties.getKerberosPrincipal());
+        props.add(kerberosProperties.getKerberosKeytab());
+        props.add(DB_USER);
+        props.add(DB_PASSWORD);
+        props.add(MAX_WAIT_TIME);
+        props.add(MAX_TOTAL_CONNECTIONS);
+        properties = Collections.unmodifiableList(props);
+    }
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        boolean confFileProvided = 
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
+
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        if (confFileProvided) {
+            final String configFiles = 
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+            ValidationResources resources = validationResourceHolder.get();
+
+            // if no resources in the holder, or if the holder has different 
resources loaded,
+            // then load the Configuration and set the new resources in the 
holder
+            if (resources == null || 
!configFiles.equals(resources.getConfigResources())) {
+                getLogger().debug("Reloading validation resources");
+                resources = new ValidationResources(configFiles, 
getConfigurationFromFiles(configFiles));
+                validationResourceHolder.set(resources);
+            }
+
+            final Configuration hiveConfig = resources.getConfiguration();
+            final String principal = 
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+            final String keytab = 
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+
+            problems.addAll(KerberosProperties.validatePrincipalAndKeytab(
+                    this.getClass().getSimpleName(), hiveConfig, principal, 
keytab, getLogger()));
+        }
+
+        return problems;
+    }
+
+    protected Configuration getConfigurationFromFiles(final String 
configFiles) {
+        final Configuration hiveConfig = new HiveConf();
+        if (StringUtils.isNotBlank(configFiles)) {
+            for (final String configFile : configFiles.split(",")) {
+                hiveConfig.addResource(new Path(configFile.trim()));
+            }
+        }
+        return hiveConfig;
+    }
+
+    /**
+     * Configures connection pool by creating an instance of the
+     * {@link BasicDataSource} based on configuration provided with
+     * {@link ConfigurationContext}.
+     * <p>
+     * This operation makes no guarantees that the actual connection could be
+     * made since the underlying system may still go off-line during normal
+     * operation of the connection pool.
+     *
+     * @param context the configuration context
+     * @throws InitializationException if unable to create a database 
connection
+     */
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) throws 
InitializationException, IOException {
+
+        connectionUrl = context.getProperty(DATABASE_URL).getValue();
+
+        final String configFiles = 
context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+        final Configuration hiveConfig = 
getConfigurationFromFiles(configFiles);
+
+        // add any dynamic properties to the Hive configuration
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                hiveConfig.set(descriptor.getName(), entry.getValue());
+            }
+        }
+
+        final String drv = HiveDriver.class.getName();
+        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+            final String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+            final String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+
+            getLogger().info("HBase Security Enabled, logging in as principal 
{} with keytab {}", new Object[]{principal, keyTab});
+            ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab);
+            getLogger().info("Successfully logged in as principal {} with 
keytab {}", new Object[]{principal, keyTab});
+
+            // if we got here then we have a ugi so start a renewer
+            if (ugi != null) {
+                final String id = getClass().getSimpleName();
+                renewer = SecurityUtil.startTicketRenewalThread(id, ugi, 
TICKET_RENEWAL_PERIOD, getLogger());
+            }
+        }
+        final String user = context.getProperty(DB_USER).getValue();
+        final String passw = context.getProperty(DB_PASSWORD).getValue();
+        final Long maxWaitMillis = 
context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Integer maxTotal = 
context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
+
+        dataSource = new BasicDataSource();
+        dataSource.setDriverClassName(drv);
+
+        final String dburl = context.getProperty(DATABASE_URL).getValue();
+
+        dataSource.setMaxWait(maxWaitMillis);
+        dataSource.setMaxActive(maxTotal);
+
+        dataSource.setUrl(dburl);
+        dataSource.setUsername(user);
+        dataSource.setPassword(passw);
+    }
+
+    /**
+     * Shutdown pool, close all open connections.
+     */
+    @OnDisabled
+    public void shutdown() {
+
+        if (renewer != null) {
+            renewer.stop();
+        }
+
+        try {
+            dataSource.close();
+        } catch (final SQLException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    @Override
+    public Connection getConnection() throws ProcessException {
+        try {
+            if (ugi != null) {
+                return ugi.doAs(new PrivilegedExceptionAction<Connection>() {
+                    @Override
+                    public Connection run() throws Exception {
+                        return dataSource.getConnection();
+                    }
+                });
+
+            } else {
+                getLogger().info("Simple Authentication");
+                return dataSource.getConnection();
+            }
+        } catch (SQLException | IOException | InterruptedException e) {
+            getLogger().error("Error getting Hive connection", e);
+            throw new ProcessException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "HiveConnectionPool[id=" + getIdentifier() + "]";
+    }
+
+    /**
+     * Validates that one or more files exist, as specified in a single 
property.
+     */
+    public static Validator createMultipleFilesExistValidator() {
+        return new Validator() {
+
+            @Override
+            public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+                final String[] files = input.split(",");
+                for (String filename : files) {
+                    try {
+                        final File file = new File(filename.trim());
+                        final boolean valid = file.exists() && file.isFile();
+                        if (!valid) {
+                            final String message = "File " + file + " does not 
exist or is not a file";
+                            return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
+                        }
+                    } catch (SecurityException e) {
+                        final String message = "Unable to access " + filename 
+ " due to " + e.getMessage();
+                        return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
+                    }
+                }
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            }
+
+        };
+    }
+
+    @Override
+    public String getConnectionURL() {
+        return connectionUrl;
+    }
+
+    private static class ValidationResources {
+        private final String configResources;
+        private final Configuration configuration;
+
+        public ValidationResources(String configResources, Configuration 
configuration) {
+            this.configResources = configResources;
+            this.configuration = configuration;
+        }
+
+        public String getConfigResources() {
+            return configResources;
+        }
+
+        public Configuration getConfiguration() {
+            return configuration;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveDBCPService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveDBCPService.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveDBCPService.java
new file mode 100644
index 0000000..4bd5320
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveDBCPService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp.hive;
+
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.dbcp.DBCPService;
+
+/**
+ * Definition for Database Connection Pooling Service.
+ *
+ */
+@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
+@CapabilityDescription("Provides Database Connection Pooling Service for 
Apache Hive. Connections can be asked from pool and returned after usage.")
+public interface HiveDBCPService extends DBCPService {
+    public String getConnectionURL();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
new file mode 100644
index 0000000..e18e464
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.HiveDBCPService;
+import org.apache.nifi.processor.AbstractProcessor;
+
+/**
+ * An abstract base class for HiveQL processors to share common data, methods, 
etc.
+ */
+public abstract class AbstractHiveQLProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor HIVE_DBCP_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Hive Database Connection Pooling Service")
+            .description("The Hive Controller Service that is used to obtain 
connection(s) to the Hive database")
+            .required(true)
+            .identifiesControllerService(HiveDBCPService.class)
+            .build();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
new file mode 100644
index 0000000..27a7f01
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.HiveDBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SeeAlso(SelectHiveQL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"sql", "hive", "put", "database", "update", "insert"})
+@CapabilityDescription("Executes a HiveQL DDL/DML command (UPDATE, INSERT, 
e.g.). The content of an incoming FlowFile is expected to be the HiveQL command 
"
+        + "to execute. The HiveQL command may use the ? to escape parameters. 
In this case, the parameters to use must exist as FlowFile attributes "
+        + "with the naming convention hiveql.args.N.type and 
hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is 
expected to be "
+        + "a number indicating the JDBC Type. The content of the FlowFile is 
expected to be in UTF-8 format.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "hiveql.args.N.type", description = 
"Incoming FlowFiles are expected to be parameterized HiveQL statements. The 
type of each Parameter is specified as an integer "
+                + "that represents the JDBC Type of the parameter."),
+        @ReadsAttribute(attribute = "hiveql.args.N.value", description = 
"Incoming FlowFiles are expected to be parameterized HiveQL statements. The 
value of the Parameters are specified as "
+                + "hiveql.args.1.value, hiveql.args.2.value, 
hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter 
is specified by the hiveql.args.1.type attribute.")
+})
+public class PutHiveQL extends AbstractHiveQLProcessor {
+
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("hive-batch-size")
+            .displayName("Batch Size")
+            .description("The preferred number of FlowFiles to put to the 
database in a single transaction")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("hive-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set of the record data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
database is successfully updated")
+            .build();
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail, "
+                    + "such as an invalid query or an integrity constraint 
violation")
+            .build();
+
+    private static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
+    private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
+
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private final static Set<Relationship> relationships;
+
+    /*
+     * Will ensure that the list of property descriptors is build only once.
+     * Will also create a Set of relationships
+     */
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.add(HIVE_DBCP_SERVICE);
+        _propertyDescriptors.add(BATCH_SIZE);
+        _propertyDescriptors.add(CHARSET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFiles = session.get(batchSize);
+
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+        final HiveDBCPService dbcpService = 
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
+        try (final Connection conn = dbcpService.getConnection()) {
+
+            for (FlowFile flowFile : flowFiles) {
+                try {
+                    final String hiveQL = getHiveQL(session, flowFile, 
charset);
+                    final PreparedStatement stmt = 
conn.prepareStatement(hiveQL);
+                    setParameters(stmt, flowFile.getAttributes());
+
+                    // Execute the statement
+                    stmt.execute();
+
+                    // Emit a Provenance SEND event
+                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                    session.getProvenanceReporter().send(flowFile, 
dbcpService.getConnectionURL(), transmissionMillis, true);
+                    session.transfer(flowFile, REL_SUCCESS);
+
+                } catch (final SQLException e) {
+
+                    if (e instanceof SQLNonTransientException) {
+                        getLogger().error("Failed to update Hive for {} due to 
{}; routing to failure", new Object[]{flowFile, e});
+                        session.transfer(flowFile, REL_FAILURE);
+                    } else {
+                        getLogger().error("Failed to update Hive for {} due to 
{}; it is possible that retrying the operation will succeed, so routing to 
retry", new Object[]{flowFile, e});
+                        flowFile = session.penalize(flowFile);
+                        session.transfer(flowFile, REL_RETRY);
+                    }
+
+                }
+            }
+        } catch (final SQLException sqle) {
+            // There was a problem getting the connection, yield and retry the 
flowfiles
+            getLogger().error("Failed to get Hive connection due to {}; it is 
possible that retrying the operation will succeed, so routing to retry", new 
Object[]{sqle});
+            session.transfer(flowFiles, REL_RETRY);
+            context.yield();
+        }
+    }
+
+    /**
+     * Determines the HiveQL statement that should be executed for the given 
FlowFile
+     *
+     * @param session  the session that can be used to access the given 
FlowFile
+     * @param flowFile the FlowFile whose HiveQL statement should be executed
+     * @return the HiveQL that is associated with the given FlowFile
+     */
+    private String getHiveQL(final ProcessSession session, final FlowFile 
flowFile, final Charset charset) {
+        // Read the HiveQL from the FlowFile's content
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer);
+            }
+        });
+
+        // Create the PreparedStatement to use for this FlowFile.
+        return new String(buffer, charset);
+    }
+
+
+    /**
+     * Sets all of the appropriate parameters on the given PreparedStatement, 
based on the given FlowFile attributes.
+     *
+     * @param stmt       the statement to set the parameters on
+     * @param attributes the attributes from which to derive parameter 
indices, values, and types
+     * @throws SQLException if the PreparedStatement throws a SQLException 
when the appropriate setter is called
+     */
+    private void setParameters(final PreparedStatement stmt, final Map<String, 
String> attributes) throws SQLException {
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+            final String key = entry.getKey();
+            final Matcher matcher = HIVEQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+            if (matcher.matches()) {
+                final int parameterIndex = Integer.parseInt(matcher.group(1));
+
+                final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
+                if (!isNumeric) {
+                    throw new ProcessException("Value of the " + key + " 
attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral 
type");
+                }
+
+                final int jdbcType = Integer.parseInt(entry.getValue());
+                final String valueAttrName = "hiveql.args." + parameterIndex + 
".value";
+                final String parameterValue = attributes.get(valueAttrName);
+
+                try {
+                    setParameter(stmt, valueAttrName, parameterIndex, 
parameterValue, jdbcType);
+                } catch (final NumberFormatException nfe) {
+                    throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
+                }
+            }
+        }
+    }
+
+    /**
+     * Determines how to map the given value to the appropriate JDBC data type 
and sets the parameter on the
+     * provided PreparedStatement
+     *
+     * @param stmt           the PreparedStatement to set the parameter on
+     * @param attrName       the name of the attribute that the parameter is 
coming from - for logging purposes
+     * @param parameterIndex the index of the HiveQL parameter to set
+     * @param parameterValue the value of the HiveQL parameter to set
+     * @param jdbcType       the JDBC Type of the HiveQL parameter to set
+     * @throws SQLException if the PreparedStatement throws a SQLException 
when calling the appropriate setter
+     */
+    private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType) throws SQLException {
+        if (parameterValue == null) {
+            stmt.setNull(parameterIndex, jdbcType);
+        } else {
+            try {
+                switch (jdbcType) {
+                    case Types.BIT:
+                    case Types.BOOLEAN:
+                        stmt.setBoolean(parameterIndex, 
Boolean.parseBoolean(parameterValue));
+                        break;
+                    case Types.TINYINT:
+                        stmt.setByte(parameterIndex, 
Byte.parseByte(parameterValue));
+                        break;
+                    case Types.SMALLINT:
+                        stmt.setShort(parameterIndex, 
Short.parseShort(parameterValue));
+                        break;
+                    case Types.INTEGER:
+                        stmt.setInt(parameterIndex, 
Integer.parseInt(parameterValue));
+                        break;
+                    case Types.BIGINT:
+                        stmt.setLong(parameterIndex, 
Long.parseLong(parameterValue));
+                        break;
+                    case Types.REAL:
+                        stmt.setFloat(parameterIndex, 
Float.parseFloat(parameterValue));
+                        break;
+                    case Types.FLOAT:
+                    case Types.DOUBLE:
+                        stmt.setDouble(parameterIndex, 
Double.parseDouble(parameterValue));
+                        break;
+                    case Types.DECIMAL:
+                    case Types.NUMERIC:
+                        stmt.setBigDecimal(parameterIndex, new 
BigDecimal(parameterValue));
+                        break;
+                    case Types.DATE:
+                        stmt.setDate(parameterIndex, new 
Date(Long.parseLong(parameterValue)));
+                        break;
+                    case Types.TIME:
+                        stmt.setTime(parameterIndex, new 
Time(Long.parseLong(parameterValue)));
+                        break;
+                    case Types.TIMESTAMP:
+                        stmt.setTimestamp(parameterIndex, new 
Timestamp(Long.parseLong(parameterValue)));
+                        break;
+                    case Types.CHAR:
+                    case Types.VARCHAR:
+                    case Types.LONGNVARCHAR:
+                    case Types.LONGVARCHAR:
+                        stmt.setString(parameterIndex, parameterValue);
+                        break;
+                    default:
+                        stmt.setObject(parameterIndex, parameterValue, 
jdbcType);
+                        break;
+                }
+            } catch (SQLException e) {
+                // Log which attribute/parameter had an error, then rethrow to 
be handled at the top level
+                getLogger().error("Error setting parameter {} to value from {} 
({})", new Object[]{parameterIndex, attrName, parameterValue}, e);
+                throw e;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
new file mode 100644
index 0000000..1b65af6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.HiveDBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.LongHolder;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.hive.HiveJdbcCommon;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@Tags({"hive", "sql", "select", "jdbc", "query", "database"})
+@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive 
database connection. Query result will be converted to Avro or CSV format."
+        + " Streaming is used so arbitrarily large result sets are supported. 
This processor can be scheduled to run on "
+        + "a timer, or cron expression, using the standard scheduling methods, 
or it can be triggered by an incoming FlowFile. "
+        + "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the "
+        + "select query. FlowFile attribute 'selecthiveql.row.count' indicates 
how many rows were selected.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the MIME 
type for the outgoing flowfile to application/avro-binary for Avro or text/csv 
for CSV."),
+        @WritesAttribute(attribute = "filename", description = "Adds .avro or 
.csv to the filename attribute depending on which output format is selected."),
+        @WritesAttribute(attribute = "selecthiveql.row.count", description = 
"Indicates how many rows were selected/returned by the query.")
+})
+public class SelectHiveQL extends AbstractHiveQLProcessor {
+
+    public static final String RESULT_ROW_COUNT = "selecthiveql.row.count";
+
+    protected static final String AVRO = "Avro";
+    protected static final String CSV = "CSV";
+
+    public static final String AVRO_MIME_TYPE = "application/avro-binary";
+    public static final String CSV_MIME_TYPE = "text/csv";
+
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully created FlowFile from HiveQL query 
result set.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("HiveQL query execution failed. Incoming FlowFile 
will be penalized and routed to this relationship")
+            .build();
+
+
+    public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
+            .name("hive-query")
+            .displayName("HiveQL Select Query")
+            .description("HiveQL SELECT query to execute")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor HIVEQL_OUTPUT_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("hive-output-format")
+            .displayName("Output Format")
+            .description("How to represent the records coming from Hive (Avro, 
CSV, e.g.)")
+            .required(true)
+            .allowableValues(AVRO, CSV)
+            .defaultValue(AVRO)
+            .expressionLanguageSupported(false)
+            .build();
+
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private final static Set<Relationship> relationships;
+
+    /*
+     * Will ensure that the list of property descriptors is built only once.
+     * Will also create a Set of relationships
+     */
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.add(HIVE_DBCP_SERVICE);
+        _propertyDescriptors.add(HIVEQL_SELECT_QUERY);
+        _propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile fileToProcess = null;
+        if (context.hasIncomingConnection()) {
+            fileToProcess = session.get();
+
+            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
+            // However, if we have no FlowFile and we have connections coming 
from other Processors, then
+            // we know that we should run only if we have a FlowFile.
+            if (fileToProcess == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        final ProcessorLog logger = getLogger();
+        final HiveDBCPService dbcpService = 
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
+        final String selectQuery = 
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+        final String outputFormat = 
context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        try (final Connection con = dbcpService.getConnection();
+             final Statement st = con.createStatement()) {
+            final LongHolder nrOfRows = new LongHolder(0L);
+            if (fileToProcess == null) {
+                fileToProcess = session.create();
+            }
+            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                    try {
+                        logger.debug("Executing query {}", new 
Object[]{selectQuery});
+                        final ResultSet resultSet = 
st.executeQuery(selectQuery);
+                        if (AVRO.equals(outputFormat)) {
+                            
nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out));
+                        } else if (CSV.equals(outputFormat)) {
+                            
nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out));
+                        } else {
+                            nrOfRows.set(0L);
+                            throw new ProcessException("Unsupported output 
format: " + outputFormat);
+                        }
+                    } catch (final SQLException e) {
+                        throw new ProcessException(e);
+                    }
+                }
+            });
+
+            // set attribute how many rows were selected
+            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, nrOfRows.get().toString());
+
+            // Set MIME type on output document and add extension
+            if (AVRO.equals(outputFormat)) {
+                fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE);
+                fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.FILENAME.key(), 
fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) + ".avro");
+            } else if (CSV.equals(outputFormat)) {
+                fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE);
+                fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.FILENAME.key(), 
fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) + ".csv");
+            }
+
+            logger.info("{} contains {} Avro records; transferring to 
'success'",
+                    new Object[]{fileToProcess, nrOfRows.get()});
+
+            if (context.hasIncomingConnection()) {
+                // If the flow file came from an incoming connection, issue a 
Modify Content provenance event
+
+                session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows",
+                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            } else {
+                // If we created a flow file from rows received from Hive, 
issue a Receive provenance event
+                session.getProvenanceReporter().receive(fileToProcess, 
dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+            session.transfer(fileToProcess, REL_SUCCESS);
+        } catch (final ProcessException | SQLException e) {
+            if (fileToProcess == null) {
+                // This can happen if any exceptions occur while setting up 
the connection, statement, etc.
+                logger.error("Unable to execute HiveQL select query {} due to 
{}. No FlowFile to route to failure",
+                        new Object[]{selectQuery, e});
+                context.yield();
+            } else {
+                if (context.hasIncomingConnection()) {
+                    logger.error("Unable to execute HiveQL select query {} for 
{} due to {}; routing to failure",
+                            new Object[]{selectQuery, fileToProcess, e});
+                    fileToProcess = session.penalize(fileToProcess);
+                } else {
+                    logger.error("Unable to execute HiveQL select query {} due 
to {}; routing to failure",
+                            new Object[]{selectQuery, e});
+                    context.yield();
+                }
+                session.transfer(fileToProcess, REL_FAILURE);
+            }
+        }
+    }
+}

Reply via email to