NIFI-981: Added SelectHiveQL and PutHiveQL processors This closes #384.
Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/59bc8264 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/59bc8264 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/59bc8264 Branch: refs/heads/0.x Commit: 59bc8264749465af224eb003786ec4bd504f10da Parents: 9fdb315 Author: Matt Burgess <mattyb...@apache.org> Authored: Tue May 3 13:11:54 2016 -0400 Committer: Bryan Bende <bbe...@apache.org> Committed: Tue May 3 13:41:44 2016 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 5 + .../src/main/asciidoc/getting-started.adoc | 2 + .../nifi-hive-bundle/nifi-hive-nar/pom.xml | 46 ++ .../src/main/resources/META-INF/NOTICE | 330 +++++++++++ .../nifi-hive-processors/pom.xml | 165 ++++++ .../nifi/dbcp/hive/HiveConnectionPool.java | 353 ++++++++++++ .../apache/nifi/dbcp/hive/HiveDBCPService.java | 32 ++ .../hive/AbstractHiveQLProcessor.java | 35 ++ .../apache/nifi/processors/hive/PutHiveQL.java | 314 +++++++++++ .../nifi/processors/hive/SelectHiveQL.java | 229 ++++++++ .../apache/nifi/util/hive/HiveJdbcCommon.java | 330 +++++++++++ ...org.apache.nifi.controller.ControllerService | 15 + .../org.apache.nifi.processor.Processor | 16 + .../nifi/processors/hive/TestPutHiveQL.java | 560 +++++++++++++++++++ .../nifi/processors/hive/TestSelectHiveQL.java | 294 ++++++++++ nifi-nar-bundles/nifi-hive-bundle/pom.xml | 35 ++ nifi-nar-bundles/pom.xml | 5 +- pom.xml | 6 + 18 files changed, 2770 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index e697fe6..5820435 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -307,6 +307,11 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-spring-nar</artifactId> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hive-nar</artifactId> + <type>nar</type> + </dependency> </dependencies> <properties> http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-docs/src/main/asciidoc/getting-started.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/getting-started.adoc b/nifi-docs/src/main/asciidoc/getting-started.adoc index 7c51888..a68f171 100644 --- a/nifi-docs/src/main/asciidoc/getting-started.adoc +++ b/nifi-docs/src/main/asciidoc/getting-started.adoc @@ -307,6 +307,8 @@ categorizing them by their functions. - *ConvertJSONToSQL*: Convert a JSON document into a SQL INSERT or UPDATE command that can then be passed to the PutSQL Processor - *ExecuteSQL*: Executes a user-defined SQL SELECT command, writing the results to a FlowFile in Avro format - *PutSQL*: Updates a database by executing the SQL DDM statement defined by the FlowFile's content +- *SelectHiveQL*: Executes a user-defined HiveQL SELECT command against an Apache Hive database, writing the results to a FlowFile in Avro or CSV format +- *PutHiveQL*: Updates a Hive database by executing the HiveQL DDM statement defined by the FlowFile's content [[AttributeExtraction]] === Attribute Extraction http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml new file mode 100644 index 0000000..d83f053 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/pom.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hive-bundle</artifactId> + <version>0.7.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-hive-nar</artifactId> + <version>0.7.0-SNAPSHOT</version> + <packaging>nar</packaging> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hive-processors</artifactId> + <version>0.7.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-libraries-nar</artifactId> + <type>nar</type> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..35422bb --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,330 @@ +nifi-hive-nar +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Ant + The following NOTICE information applies: + Apache Ant + Copyright 1999-2016 The Apache Software Foundation + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons DBCP + The following NOTICE information applies: + Apache Commons DBCP + Copyright 2001-2015 The Apache Software Foundation. + + (ASLv2) Apache Commons EL + The following NOTICE information applies: + Apache Commons EL + Copyright 1999-2016 The Apache Software Foundation + + EL-8 patch - Copyright 2004-2007 Jamie Taylor + http://issues.apache.org/jira/browse/EL-8 + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpComponents Client + Copyright 1999-2016 The Apache Software Foundation + Apache HttpComponents Core - HttpCore + Copyright 2006-2009 The Apache Software Foundation + + (ASLv2) Apache Commons Logging + The following NOTICE information applies: + Apache Commons Logging + Copyright 2003-2014 The Apache Software Foundation + + (ASLv2) Apache Commons Pool + The following NOTICE information applies: + Apache Commons Pool + Copyright 1999-2009 The Apache Software Foundation. + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2012 The Apache Software Foundation + + (ASLv2) Apache Hive + The following NOTICE information applies: + Apache Hive + Copyright 2008-2015 The Apache Software Foundation + + This product includes software developed by The Apache Software + Foundation (http://www.apache.org/). + + This product includes Jersey (https://jersey.java.net/) + Copyright (c) 2010-2014 Oracle and/or its affiliates. + + This project includes software copyrighted by Microsoft Corporation and + licensed under the Apache License, Version 2.0. + + This project includes software copyrighted by Dell SecureWorks and + licensed under the Apache License, Version 2.0. + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) BoneCP + The following NOTICE information applies: + BoneCP + Copyright 2010 Wallace Wadge + + (ASLv2) Apache Hadoop + The following NOTICE information applies: + The binary distribution of this product bundles binaries of + org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which has the + following notices: + * Copyright 2011 Dain Sundstrom <d...@iq80.com> + * Copyright 2011 FuseSource Corp. http://fusesource.com + + The binary distribution of this product bundles binaries of + org.fusesource.hawtjni:hawtjni-runtime (https://github.com/fusesource/hawtjni), + which has the following notices: + * This product includes software developed by FuseSource Corp. + http://fusesource.com + * This product includes software developed at + Progress Software Corporation and/or its subsidiaries or affiliates. + * This product includes software developed by IBM Corporation and others. + + (ASLv2) Apache HBase + The following NOTICE information applies: + Apache HBase + Copyright 2007-2015 The Apache Software Foundation + + -- + This product incorporates portions of the 'Hadoop' project + + Copyright 2007-2009 The Apache Software Foundation + + Licensed under the Apache License v2.0 + -- + Our Orca logo we got here: http://www.vectorfree.com/jumping-orca + It is licensed Creative Commons Attribution 3.0. + See https://creativecommons.org/licenses/by/3.0/us/ + We changed the logo by stripping the colored background, inverting + it and then rotating it some. + + Later we found that vectorfree.com image is not properly licensed. + The original is owned by vectorportal.com. The original was + relicensed so we could use it as Creative Commons Attribution 3.0. + The license is bundled with the download available here: + http://www.vectorportal.com/subcategory/205/KILLER-WHALE-FREE-VECTOR.eps/ifile/9136/detailtest.asp + -- + This product includes portions of the Bootstrap project v3.0.0 + + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License v2.0 + + This product uses the Glyphicons Halflings icon set. + + http://glyphicons.com/ + + Copyright Jan KovaÅÃk + + Licensed under the Apache License v2.0 as a part of the Bootstrap project. + + -- + This product includes portions of the Guava project v14, specifically + 'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java' + + Copyright (C) 2007 The Guava Authors + + Licensed under the Apache License, Version 2.0 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + (ASLv2) Apache Curator + The following NOTICE information applies: + Apache Curator + Copyright 2013-2014 The Apache Software Foundation + + (ASLv2) Apache Derby + The following NOTICE information applies: + Apache Derby + Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils, + the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation. + + (ASLv2) Apache DS + The following NOTICE information applies: + ApacheDS + Copyright 2003-2015 The Apache Software Foundation + + (ASLv2) Apache Geronimo + The following NOTICE information applies: + Apache Geronimo + Copyright 2003-2008 The Apache Software Foundation + + (ASLv2) HTrace Core + The following NOTICE information applies: + In addition, this product includes software dependencies. See + the accompanying LICENSE.txt for a listing of dependencies + that are NOT Apache licensed (with pointers to their licensing) + + Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin + is a distributed tracing system that is Apache 2.0 Licensed. + Copyright 2012 Twitter, Inc. + + (ASLv2) Jettison + The following NOTICE information applies: + Copyright 2006 Envoi Solutions LLC + + (ASLv2) Jetty + The following NOTICE information applies: + Jetty Web Container + Copyright 1995-2015 Mort Bay Consulting Pty Ltd. + + (ASLv2) Apache log4j + The following NOTICE information applies: + Apache log4j + Copyright 2007 The Apache Software Foundation + + (ASLv2) Parquet MR + The following NOTICE information applies: + Parquet MR + Copyright 2012 Twitter, Inc. + + This project includes code from https://github.com/lemire/JavaFastPFOR + parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java + Apache License Version 2.0 http://www.apache.org/licenses/. + (c) Daniel Lemire, http://lemire.me/en/ + + (ASLv2) Apache Thrift + The following NOTICE information applies: + Apache Thrift + Copyright 2006-2010 The Apache Software Foundation. + + (ASLv2) Apache Twill + The following NOTICE information applies: + Apache Twill + Copyright 2013-2016 The Apache Software Foundation + + (ASLv2) Dropwizard Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2013 Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64, + LongAdder), which was released with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ + + (ASLv2) Joda Time + The following NOTICE information applies: + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) The Netty Project + The following NOTICE information applies: + The Netty Project + Copyright 2011 The Netty Project + + (ASLv2) Apache Tomcat + The following NOTICE information applies: + Apache Tomcat + Copyright 2007 The Apache Software Foundation + + Java Management Extensions (JMX) support is provided by + the MX4J package, which is open source software. The + original software and related information is available + at http://mx4j.sourceforge.net. + + Java compilation software for JSP pages is provided by Eclipse, + which is open source software. The orginal software and + related infomation is available at + http://www.eclipse.org. + + (ASLv2) Apache ZooKeeper + The following NOTICE information applies: + Apache ZooKeeper + Copyright 2009-2012 The Apache Software Foundation + + (ASLv2) Google GSON + The following NOTICE information applies: + Copyright 2008 Google Inc. + + (ASLv2) JPam + The following NOTICE information applies: + Copyright 2003-2006 Greg Luck + + ************************ + Common Development and Distribution License 1.1 + ************************ + + The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net) + (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) (javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail) + + + ************************ + Common Development and Distribution License 1.0 + ************************ + + The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details. + + (CDDL 1.0) JavaServlet(TM) Specification (javax.servlet:servlet-api:jar:2.5 - no url available) + (CDDL 1.0) (GPL3) Streaming API For XML (javax.xml.stream:stax-api:jar:1.0-2 - no url provided) + (CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp) + (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net) + (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net) + + ***************** + Public Domain + ***************** + + The following binary components are provided to the 'Public Domain'. See project link for details. + + (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/) http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml new file mode 100644 index 0000000..33c4552 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml @@ -0,0 +1,165 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hive-bundle</artifactId> + <version>0.7.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-hive-processors</artifactId> + <packaging>jar</packaging> + + <properties> + <hive.version>2.0.0</hive.version> + </properties> + + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-dbcp-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + <exclusion> + <groupId>com.thoughtworks.paranamer</groupId> + <artifactId>paranamer</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> + <exclusion> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </exclusion> + <exclusion> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </exclusion> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-utils</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java new file mode 100644 index 0000000..07f1469 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp.hive; + +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.jdbc.HiveDriver; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.KerberosTicketRenewer; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NiFiProperties; + +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Implementation for Database Connection Pooling Service used for Apache Hive connections. Apache DBCP is used for connection pooling functionality. + */ +@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"}) +@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive. Connections can be asked from pool and returned after usage.") +public class HiveConnectionPool extends AbstractControllerService implements HiveDBCPService { + + public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() + .name("hive-db-connect-url") + .displayName("Database Connection URL") + .description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters." + + " The exact syntax of a database connection URL is specified by the Hive documentation. For example, the server principal is often included " + + "as a connection parameter when connecting to a secure Hive server.") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() + .name("hive-config-resources") + .displayName("Hive Configuration Resources") + .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop " + + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication " + + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.") + .required(false).addValidator(createMultipleFilesExistValidator()).build(); + + public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() + .name("hive-db-user") + .displayName("Database User") + .description("Database user name") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() + .name("hive-db-password") + .displayName("Password") + .description("The password for the database user") + .defaultValue(null) + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() + .name("hive-max-wait-time") + .displayName("Max Wait Time") + .description("The maximum amount of time that the pool will wait (when there are no available connections) " + + " for a connection to be returned before failing, or -1 to wait indefinitely. ") + .defaultValue("500 millis") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder() + .name("hive-max-total-connections") + .displayName("Max Total Connections") + .description("The maximum number of active connections that can be allocated from this pool at the same time, " + + "or negative for no limit.") + .defaultValue("8") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .sensitive(false) + .build(); + + static final long TICKET_RENEWAL_PERIOD = 60000; + + private volatile UserGroupInformation ugi; + private volatile KerberosTicketRenewer renewer; + + private final static List<PropertyDescriptor> properties; + private static KerberosProperties kerberosProperties; + + private String connectionUrl = "unknown"; + + // Holder of cached Configuration information so validation does not reload the same config over and over + private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>(); + + private volatile BasicDataSource dataSource; + + + static { + kerberosProperties = KerberosProperties.create(NiFiProperties.getInstance()); + List<PropertyDescriptor> props = new ArrayList<>(); + props.add(DATABASE_URL); + props.add(HIVE_CONFIGURATION_RESOURCES); + props.add(kerberosProperties.getKerberosPrincipal()); + props.add(kerberosProperties.getKerberosKeytab()); + props.add(DB_USER); + props.add(DB_PASSWORD); + props.add(MAX_WAIT_TIME); + props.add(MAX_TOTAL_CONNECTIONS); + properties = Collections.unmodifiableList(props); + } + + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet(); + + final List<ValidationResult> problems = new ArrayList<>(); + + if (confFileProvided) { + final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); + ValidationResources resources = validationResourceHolder.get(); + + // if no resources in the holder, or if the holder has different resources loaded, + // then load the Configuration and set the new resources in the holder + if (resources == null || !configFiles.equals(resources.getConfigResources())) { + getLogger().debug("Reloading validation resources"); + resources = new ValidationResources(configFiles, getConfigurationFromFiles(configFiles)); + validationResourceHolder.set(resources); + } + + final Configuration hiveConfig = resources.getConfiguration(); + final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + problems.addAll(KerberosProperties.validatePrincipalAndKeytab( + this.getClass().getSimpleName(), hiveConfig, principal, keytab, getLogger())); + } + + return problems; + } + + protected Configuration getConfigurationFromFiles(final String configFiles) { + final Configuration hiveConfig = new HiveConf(); + if (StringUtils.isNotBlank(configFiles)) { + for (final String configFile : configFiles.split(",")) { + hiveConfig.addResource(new Path(configFile.trim())); + } + } + return hiveConfig; + } + + /** + * Configures connection pool by creating an instance of the + * {@link BasicDataSource} based on configuration provided with + * {@link ConfigurationContext}. + * <p> + * This operation makes no guarantees that the actual connection could be + * made since the underlying system may still go off-line during normal + * operation of the connection pool. + * + * @param context the configuration context + * @throws InitializationException if unable to create a database connection + */ + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException, IOException { + + connectionUrl = context.getProperty(DATABASE_URL).getValue(); + + final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); + final Configuration hiveConfig = getConfigurationFromFiles(configFiles); + + // add any dynamic properties to the Hive configuration + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + hiveConfig.set(descriptor.getName(), entry.getValue()); + } + } + + final String drv = HiveDriver.class.getName(); + if (SecurityUtil.isSecurityEnabled(hiveConfig)) { + final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + getLogger().info("HBase Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab}); + ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab); + getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); + + // if we got here then we have a ugi so start a renewer + if (ugi != null) { + final String id = getClass().getSimpleName(); + renewer = SecurityUtil.startTicketRenewalThread(id, ugi, TICKET_RENEWAL_PERIOD, getLogger()); + } + } + final String user = context.getProperty(DB_USER).getValue(); + final String passw = context.getProperty(DB_PASSWORD).getValue(); + final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); + + dataSource = new BasicDataSource(); + dataSource.setDriverClassName(drv); + + final String dburl = context.getProperty(DATABASE_URL).getValue(); + + dataSource.setMaxWait(maxWaitMillis); + dataSource.setMaxActive(maxTotal); + + dataSource.setUrl(dburl); + dataSource.setUsername(user); + dataSource.setPassword(passw); + } + + /** + * Shutdown pool, close all open connections. + */ + @OnDisabled + public void shutdown() { + + if (renewer != null) { + renewer.stop(); + } + + try { + dataSource.close(); + } catch (final SQLException e) { + throw new ProcessException(e); + } + } + + @Override + public Connection getConnection() throws ProcessException { + try { + if (ugi != null) { + return ugi.doAs(new PrivilegedExceptionAction<Connection>() { + @Override + public Connection run() throws Exception { + return dataSource.getConnection(); + } + }); + + } else { + getLogger().info("Simple Authentication"); + return dataSource.getConnection(); + } + } catch (SQLException | IOException | InterruptedException e) { + getLogger().error("Error getting Hive connection", e); + throw new ProcessException(e); + } + } + + @Override + public String toString() { + return "HiveConnectionPool[id=" + getIdentifier() + "]"; + } + + /** + * Validates that one or more files exist, as specified in a single property. + */ + public static Validator createMultipleFilesExistValidator() { + return new Validator() { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + final String[] files = input.split(","); + for (String filename : files) { + try { + final File file = new File(filename.trim()); + final boolean valid = file.exists() && file.isFile(); + if (!valid) { + final String message = "File " + file + " does not exist or is not a file"; + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build(); + } + } catch (SecurityException e) { + final String message = "Unable to access " + filename + " due to " + e.getMessage(); + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build(); + } + } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + + }; + } + + @Override + public String getConnectionURL() { + return connectionUrl; + } + + private static class ValidationResources { + private final String configResources; + private final Configuration configuration; + + public ValidationResources(String configResources, Configuration configuration) { + this.configResources = configResources; + this.configuration = configuration; + } + + public String getConfigResources() { + return configResources; + } + + public Configuration getConfiguration() { + return configuration; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveDBCPService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveDBCPService.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveDBCPService.java new file mode 100644 index 0000000..4bd5320 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveDBCPService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp.hive; + + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.dbcp.DBCPService; + +/** + * Definition for Database Connection Pooling Service. + * + */ +@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"}) +@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive. Connections can be asked from pool and returned after usage.") +public interface HiveDBCPService extends DBCPService { + public String getConnectionURL(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java new file mode 100644 index 0000000..e18e464 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.hive.HiveDBCPService; +import org.apache.nifi.processor.AbstractProcessor; + +/** + * An abstract base class for HiveQL processors to share common data, methods, etc. + */ +public abstract class AbstractHiveQLProcessor extends AbstractProcessor { + + public static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("Hive Database Connection Pooling Service") + .description("The Hive Controller Service that is used to obtain connection(s) to the Hive database") + .required(true) + .identifiesControllerService(HiveDBCPService.class) + .build(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java new file mode 100644 index 0000000..27a7f01 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.hive.HiveDBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLNonTransientException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SeeAlso(SelectHiveQL.class) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"sql", "hive", "put", "database", "update", "insert"}) +@CapabilityDescription("Executes a HiveQL DDL/DML command (UPDATE, INSERT, e.g.). The content of an incoming FlowFile is expected to be the HiveQL command " + + "to execute. The HiveQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + + "with the naming convention hiveql.args.N.type and hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is expected to be " + + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "hiveql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized HiveQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter."), + @ReadsAttribute(attribute = "hiveql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized HiveQL statements. The value of the Parameters are specified as " + + "hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.") +}) +public class PutHiveQL extends AbstractHiveQLProcessor { + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("hive-batch-size") + .displayName("Batch Size") + .description("The preferred number of FlowFiles to put to the database in a single transaction") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("hive-charset") + .displayName("Character Set") + .description("Specifies the character set of the record data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the database is successfully updated") + .build(); + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + + "such as an invalid query or an integrity constraint violation") + .build(); + + private static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type"); + private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); + + private final static List<PropertyDescriptor> propertyDescriptors; + private final static Set<Relationship> relationships; + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(HIVE_DBCP_SERVICE); + _propertyDescriptors.add(BATCH_SIZE); + _propertyDescriptors.add(CHARSET); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + _relationships.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final List<FlowFile> flowFiles = session.get(batchSize); + + if (flowFiles.isEmpty()) { + return; + } + + final long startNanos = System.nanoTime(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); + try (final Connection conn = dbcpService.getConnection()) { + + for (FlowFile flowFile : flowFiles) { + try { + final String hiveQL = getHiveQL(session, flowFile, charset); + final PreparedStatement stmt = conn.prepareStatement(hiveQL); + setParameters(stmt, flowFile.getAttributes()); + + // Execute the statement + stmt.execute(); + + // Emit a Provenance SEND event + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, dbcpService.getConnectionURL(), transmissionMillis, true); + session.transfer(flowFile, REL_SUCCESS); + + } catch (final SQLException e) { + + if (e instanceof SQLNonTransientException) { + getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } else { + getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_RETRY); + } + + } + } + } catch (final SQLException sqle) { + // There was a problem getting the connection, yield and retry the flowfiles + getLogger().error("Failed to get Hive connection due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{sqle}); + session.transfer(flowFiles, REL_RETRY); + context.yield(); + } + } + + /** + * Determines the HiveQL statement that should be executed for the given FlowFile + * + * @param session the session that can be used to access the given FlowFile + * @param flowFile the FlowFile whose HiveQL statement should be executed + * @return the HiveQL that is associated with the given FlowFile + */ + private String getHiveQL(final ProcessSession session, final FlowFile flowFile, final Charset charset) { + // Read the HiveQL from the FlowFile's content + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + // Create the PreparedStatement to use for this FlowFile. + return new String(buffer, charset); + } + + + /** + * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes. + * + * @param stmt the statement to set the parameters on + * @param attributes the attributes from which to derive parameter indices, values, and types + * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called + */ + private void setParameters(final PreparedStatement stmt, final Map<String, String> attributes) throws SQLException { + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + final String key = entry.getKey(); + final Matcher matcher = HIVEQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); + if (matcher.matches()) { + final int parameterIndex = Integer.parseInt(matcher.group(1)); + + final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); + if (!isNumeric) { + throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); + } + + final int jdbcType = Integer.parseInt(entry.getValue()); + final String valueAttrName = "hiveql.args." + parameterIndex + ".value"; + final String parameterValue = attributes.get(valueAttrName); + + try { + setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); + } catch (final NumberFormatException nfe) { + throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); + } + } + } + } + + /** + * Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the + * provided PreparedStatement + * + * @param stmt the PreparedStatement to set the parameter on + * @param attrName the name of the attribute that the parameter is coming from - for logging purposes + * @param parameterIndex the index of the HiveQL parameter to set + * @param parameterValue the value of the HiveQL parameter to set + * @param jdbcType the JDBC Type of the HiveQL parameter to set + * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter + */ + private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { + if (parameterValue == null) { + stmt.setNull(parameterIndex, jdbcType); + } else { + try { + switch (jdbcType) { + case Types.BIT: + case Types.BOOLEAN: + stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); + break; + case Types.TINYINT: + stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); + break; + case Types.SMALLINT: + stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); + break; + case Types.INTEGER: + stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); + break; + case Types.BIGINT: + stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); + break; + case Types.REAL: + stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); + break; + case Types.FLOAT: + case Types.DOUBLE: + stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); + break; + case Types.DECIMAL: + case Types.NUMERIC: + stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); + break; + case Types.DATE: + stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); + break; + case Types.TIME: + stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); + break; + case Types.TIMESTAMP: + stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + stmt.setString(parameterIndex, parameterValue); + break; + default: + stmt.setObject(parameterIndex, parameterValue, jdbcType); + break; + } + } catch (SQLException e) { + // Log which attribute/parameter had an error, then rethrow to be handled at the top level + getLogger().error("Error setting parameter {} to value from {} ({})", new Object[]{parameterIndex, attrName, parameterValue}, e); + throw e; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/59bc8264/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java new file mode 100644 index 0000000..1b65af6 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.hive.HiveDBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.hive.HiveJdbcCommon; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@EventDriven +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hive", "sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive database connection. Query result will be converted to Avro or CSV format." + + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'selecthiveql.row.count' indicates how many rows were selected.") +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."), + @WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."), + @WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query.") +}) +public class SelectHiveQL extends AbstractHiveQLProcessor { + + public static final String RESULT_ROW_COUNT = "selecthiveql.row.count"; + + protected static final String AVRO = "Avro"; + protected static final String CSV = "CSV"; + + public static final String AVRO_MIME_TYPE = "application/avro-binary"; + public static final String CSV_MIME_TYPE = "text/csv"; + + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from HiveQL query result set.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") + .build(); + + + public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder() + .name("hive-query") + .displayName("HiveQL Select Query") + .description("HiveQL SELECT query to execute") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor HIVEQL_OUTPUT_FORMAT = new PropertyDescriptor.Builder() + .name("hive-output-format") + .displayName("Output Format") + .description("How to represent the records coming from Hive (Avro, CSV, e.g.)") + .required(true) + .allowableValues(AVRO, CSV) + .defaultValue(AVRO) + .expressionLanguageSupported(false) + .build(); + + private final static List<PropertyDescriptor> propertyDescriptors; + private final static Set<Relationship> relationships; + + /* + * Will ensure that the list of property descriptors is built only once. + * Will also create a Set of relationships + */ + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(HIVE_DBCP_SERVICE); + _propertyDescriptors.add(HIVEQL_SELECT_QUERY); + _propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile fileToProcess = null; + if (context.hasIncomingConnection()) { + fileToProcess = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (fileToProcess == null && context.hasNonLoopConnection()) { + return; + } + } + + final ProcessorLog logger = getLogger(); + final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); + final String selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); + final String outputFormat = context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue(); + final StopWatch stopWatch = new StopWatch(true); + + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + final LongHolder nrOfRows = new LongHolder(0L); + if (fileToProcess == null) { + fileToProcess = session.create(); + } + fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + logger.debug("Executing query {}", new Object[]{selectQuery}); + final ResultSet resultSet = st.executeQuery(selectQuery); + if (AVRO.equals(outputFormat)) { + nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out)); + } else if (CSV.equals(outputFormat)) { + nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out)); + } else { + nrOfRows.set(0L); + throw new ProcessException("Unsupported output format: " + outputFormat); + } + } catch (final SQLException e) { + throw new ProcessException(e); + } + } + }); + + // set attribute how many rows were selected + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString()); + + // Set MIME type on output document and add extension + if (AVRO.equals(outputFormat)) { + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE); + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.FILENAME.key(), fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) + ".avro"); + } else if (CSV.equals(outputFormat)) { + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE); + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.FILENAME.key(), fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) + ".csv"); + } + + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{fileToProcess, nrOfRows.get()}); + + if (context.hasIncomingConnection()) { + // If the flow file came from an incoming connection, issue a Modify Content provenance event + + session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", + stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } else { + // If we created a flow file from rows received from Hive, issue a Receive provenance event + session.getProvenanceReporter().receive(fileToProcess, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } + session.transfer(fileToProcess, REL_SUCCESS); + } catch (final ProcessException | SQLException e) { + if (fileToProcess == null) { + // This can happen if any exceptions occur while setting up the connection, statement, etc. + logger.error("Unable to execute HiveQL select query {} due to {}. No FlowFile to route to failure", + new Object[]{selectQuery, e}); + context.yield(); + } else { + if (context.hasIncomingConnection()) { + logger.error("Unable to execute HiveQL select query {} for {} due to {}; routing to failure", + new Object[]{selectQuery, fileToProcess, e}); + fileToProcess = session.penalize(fileToProcess); + } else { + logger.error("Unable to execute HiveQL select query {} due to {}; routing to failure", + new Object[]{selectQuery, e}); + context.yield(); + } + session.transfer(fileToProcess, REL_FAILURE); + } + } + } +}