Repository: nifi
Updated Branches:
  refs/heads/master 833601406 -> 106b0fa0f


http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
new file mode 100644
index 0000000..c048c02
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.hive;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaBuilder.FieldAssembler;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.sql.Types.ARRAY;
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.BLOB;
+import static java.sql.Types.BOOLEAN;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.DATE;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.FLOAT;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NUMERIC;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.REAL;
+import static java.sql.Types.ROWID;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+
+/**
+ * JDBC / HiveQL common functions.
+ */
+public class HiveJdbcCommon {
+
+    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream) throws SQLException, IOException {
+        return convertToAvroStream(rs, outStream, null, null);
+    }
+
+
+    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, String recordName, ResultSetRowCallback callback)
+            throws SQLException, IOException {
+        final Schema schema = createSchema(rs, recordName);
+        final GenericRecord rec = new GenericData.Record(schema);
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
+            dataFileWriter.create(schema, outStream);
+
+            final ResultSetMetaData meta = rs.getMetaData();
+            final int nrOfColumns = meta.getColumnCount();
+            long nrOfRows = 0;
+            while (rs.next()) {
+                if (callback != null) {
+                    callback.processRow(rs);
+                }
+                for (int i = 1; i <= nrOfColumns; i++) {
+                    final int javaSqlType = meta.getColumnType(i);
+                    final Object value = rs.getObject(i);
+
+                    if (value == null) {
+                        rec.put(i - 1, null);
+
+                    } else if (javaSqlType == BINARY || javaSqlType == 
VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || 
javaSqlType == BLOB || javaSqlType == CLOB) {
+                        // bytes requires little bit different handling
+                        byte[] bytes = rs.getBytes(i);
+                        ByteBuffer bb = ByteBuffer.wrap(bytes);
+                        rec.put(i - 1, bb);
+
+                    } else if (value instanceof Byte) {
+                        // tinyint(1) type is returned by JDBC driver as 
java.sql.Types.TINYINT
+                        // But value is returned by JDBC as java.lang.Byte
+                        // (at least H2 JDBC works this way)
+                        // direct put to avro record results:
+                        // org.apache.avro.AvroRuntimeException: Unknown datum 
type java.lang.Byte
+                        rec.put(i - 1, ((Byte) value).intValue());
+
+                    } else if (value instanceof BigDecimal || value instanceof 
BigInteger) {
+                        // Avro can't handle BigDecimal and BigInteger as 
numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: 
java.math.BigDecimal: 38"
+                        rec.put(i - 1, value.toString());
+
+                    } else if (value instanceof Number || value instanceof 
Boolean) {
+                        rec.put(i - 1, value);
+
+                    } else {
+                        // The different types that we support are numbers 
(int, long, double, float),
+                        // as well as boolean values and Strings. Since Avro 
doesn't provide
+                        // timestamp types, we want to convert those to 
Strings. So we will cast anything other
+                        // than numbers or booleans to strings by using the 
toString() method.
+                        rec.put(i - 1, value.toString());
+                    }
+                }
+                dataFileWriter.append(rec);
+                nrOfRows += 1;
+            }
+
+            return nrOfRows;
+        }
+    }
+
+    public static Schema createSchema(final ResultSet rs) throws SQLException {
+        return createSchema(rs, null);
+    }
+
+    /**
+     * Creates an Avro schema from a result set. If the table/record name is 
known a priori and provided, use that as a
+     * fallback for the record name if it cannot be retrieved from the result 
set, and finally fall back to a default value.
+     *
+     * @param rs         The result set to convert to Avro
+     * @param recordName The a priori record name to use if it cannot be 
determined from the result set.
+     * @return A Schema object representing the result set converted to an 
Avro record
+     * @throws SQLException if any error occurs during conversion
+     */
+    public static Schema createSchema(final ResultSet rs, String recordName) 
throws SQLException {
+        final ResultSetMetaData meta = rs.getMetaData();
+        final int nrOfColumns = meta.getColumnCount();
+        String tableName = StringUtils.isEmpty(recordName) ? 
"NiFi_SelectHiveQL_Record" : recordName;
+        try {
+            if (nrOfColumns > 0) {
+                // Hive JDBC doesn't support getTableName, instead it returns 
table.column for column name. Grab the table name from the first column
+                String firstColumnNameFromMeta = meta.getColumnName(1);
+                int tableNameDelimiter = 
firstColumnNameFromMeta.lastIndexOf(".");
+                if (tableNameDelimiter > -1) {
+                    String tableNameFromMeta = 
firstColumnNameFromMeta.substring(0, tableNameDelimiter);
+                    if (!StringUtils.isBlank(tableNameFromMeta)) {
+                        tableName = tableNameFromMeta;
+                    }
+                }
+            }
+        } catch (SQLException se) {
+            // Not all drivers support getTableName, so just use the 
previously-set default
+        }
+
+        final FieldAssembler<Schema> builder = 
SchemaBuilder.record(tableName).namespace("any.data").fields();
+
+        /**
+         * Some missing Avro types - Decimal, Date types. May need some 
additional work.
+         */
+        for (int i = 1; i <= nrOfColumns; i++) {
+            String columnNameFromMeta = meta.getColumnName(i);
+            // Hive returns table.column for column name. Grab the column name 
as the string after the last period
+            int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
+            String columnName = 
columnNameFromMeta.substring(columnNameDelimiter + 1);
+            switch (meta.getColumnType(i)) {
+                case CHAR:
+                case LONGNVARCHAR:
+                case LONGVARCHAR:
+                case NCHAR:
+                case NVARCHAR:
+                case VARCHAR:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    break;
+
+                case BIT:
+                case BOOLEAN:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
+                    break;
+
+                case INTEGER:
+                    // Default to signed type unless otherwise noted. Some 
JDBC drivers don't implement isSigned()
+                    boolean signedType = true;
+                    try {
+                        signedType = meta.isSigned(i);
+                    } catch (SQLException se) {
+                        // Use signed types as default
+                    }
+                    if (signedType) {
+                        
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
+                    } else {
+                        
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
+                    }
+                    break;
+
+                case SMALLINT:
+                case TINYINT:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
+                    break;
+
+                case BIGINT:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
+                    break;
+
+                // java.sql.RowId is interface, is seems to be database
+                // implementation specific, let's convert to String
+                case ROWID:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    break;
+
+                case FLOAT:
+                case REAL:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
+                    break;
+
+                case DOUBLE:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
+                    break;
+
+                // Did not find direct suitable type, need to be clarified!!!!
+                case DECIMAL:
+                case NUMERIC:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    break;
+
+                // Did not find direct suitable type, need to be clarified!!!!
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    break;
+
+                case BINARY:
+                case VARBINARY:
+                case LONGVARBINARY:
+                case ARRAY:
+                case BLOB:
+                case CLOB:
+                    
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
+                    break;
+
+
+                default:
+                    throw new IllegalArgumentException("createSchema: Unknown 
SQL type " + meta.getColumnType(i) + " cannot be converted to Avro type");
+            }
+        }
+
+        return builder.endRecord();
+    }
+
+    public static long convertToCsvStream(final ResultSet rs, final 
OutputStream outStream) throws SQLException, IOException {
+        return convertToCsvStream(rs, outStream, null, null);
+    }
+
+    public static long convertToCsvStream(final ResultSet rs, final 
OutputStream outStream, String recordName, ResultSetRowCallback callback)
+            throws SQLException, IOException {
+
+        final ResultSetMetaData meta = rs.getMetaData();
+        final int nrOfColumns = meta.getColumnCount();
+        List<String> columnNames = new ArrayList<>(nrOfColumns);
+
+        for (int i = 1; i <= nrOfColumns; i++) {
+            String columnNameFromMeta = meta.getColumnName(i);
+            // Hive returns table.column for column name. Grab the column name 
as the string after the last period
+            int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
+            columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 
1));
+        }
+
+        // Write column names as header row
+        outStream.write(StringUtils.join(columnNames, 
",").getBytes(StandardCharsets.UTF_8));
+        outStream.write("\n".getBytes(StandardCharsets.UTF_8));
+
+        // Iterate over the rows
+        long nrOfRows = 0;
+        while (rs.next()) {
+            if (callback != null) {
+                callback.processRow(rs);
+            }
+            List<String> rowValues = new ArrayList<>(nrOfColumns);
+            for (int i = 1; i <= nrOfColumns; i++) {
+                final int javaSqlType = meta.getColumnType(i);
+                final Object value = rs.getObject(i);
+
+                switch (javaSqlType) {
+                    case CHAR:
+                    case LONGNVARCHAR:
+                    case LONGVARCHAR:
+                    case NCHAR:
+                    case NVARCHAR:
+                    case VARCHAR:
+                        rowValues.add("\"" + 
StringEscapeUtils.escapeCsv(rs.getString(i)) + "\"");
+                        break;
+                    default:
+                        rowValues.add(value.toString());
+                }
+            }
+            // Write row values
+            outStream.write(StringUtils.join(rowValues, 
",").getBytes(StandardCharsets.UTF_8));
+            outStream.write("\n".getBytes(StandardCharsets.UTF_8));
+            nrOfRows++;
+        }
+        return nrOfRows;
+    }
+
+    /**
+     * An interface for callback methods which allows processing of a row 
during the convertToXYZStream() processing.
+     * <b>IMPORTANT:</b> This method should only work on the row pointed at by 
the current ResultSet reference.
+     * Advancing the cursor (e.g.) can cause rows to be skipped during Avro 
transformation.
+     */
+    public interface ResultSetRowCallback {
+        void processRow(ResultSet resultSet) throws IOException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..6a2936e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.dbcp.hive.HiveConnectionPool
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..b218214
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.hive.SelectHiveQL
+org.apache.nifi.processors.hive.PutHiveQL

http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
new file mode 100644
index 0000000..b46b847
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
@@ -0,0 +1,560 @@
+package org.apache.nifi.processors.hive;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.dbcp.hive.HiveDBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestPutHiveQL {
+    private static final String createPersons = "CREATE TABLE PERSONS (id 
integer primary key, name varchar(100), code integer)";
+    private static final String createPersonsAutoId = "CREATE TABLE PERSONS 
(id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name 
VARCHAR(100), code INTEGER check(code <= 100))";
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    @Test
+    public void testDirectStatements() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 
'Mark', 84)".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+
+        runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE 
ID=1".getBytes());
+        runner.run();
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("George", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testFailInMiddleWithBadStatement() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
84)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally 
wrong syntax
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 
3)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 
44)".getBytes());
+        runner.run();
+
+        runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1);
+        runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3);
+    }
+
+
+    @Test
+    public void testFailInMiddleWithBadParameterType() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+
+        final Map<String, String> goodAttributes = new HashMap<>();
+        goodAttributes.put("hiveql.args.1.type", 
String.valueOf(Types.INTEGER));
+        goodAttributes.put("hiveql.args.1.value", "84");
+
+        final Map<String, String> badAttributes = new HashMap<>();
+        badAttributes.put("hiveql.args.1.type", String.valueOf(Types.VARCHAR));
+        badAttributes.put("hiveql.args.1.value", "hello");
+
+        final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
?)".getBytes();
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, badAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.run();
+
+        runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1);
+        runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3);
+    }
+
+
+    @Test
+    public void testFailInMiddleWithBadParameterValue() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+
+        final Map<String, String> goodAttributes = new HashMap<>();
+        goodAttributes.put("hiveql.args.1.type", 
String.valueOf(Types.INTEGER));
+        goodAttributes.put("hiveql.args.1.value", "84");
+
+        final Map<String, String> badAttributes = new HashMap<>();
+        badAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        badAttributes.put("hiveql.args.1.value", "9999");
+
+        final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
?)".getBytes();
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, badAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.run();
+
+        runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3);
+        runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertTrue(rs.next());
+                assertTrue(rs.next());
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testUsingSqlDataTypesWithNegativeValues() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code bigint)");
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", "-5");
+        attributes.put("hiveql.args.1.value", "84");
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 
'Mark', ?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testStatementsWithPreparedParameters() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+
+        runner.clearTransferState();
+
+        attributes.clear();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.1.value", "George");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.2.value", "1");
+
+        runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), 
attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("George", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testMultipleStatementsWithinFlowFile() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+            "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        runner.run();
+
+        // should fail because of the semicolon
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_FAILURE, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testWithNullParameter() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(0, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testInvalidStatement() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+            "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        runner.run();
+
+        // should fail because of the semicolon
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_FAILURE, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testRetryableFailure() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final DBCPService service = new SQLExceptionService(null);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+            "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        runner.run();
+
+        // should fail because of the semicolon
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1);
+    }
+
+    /**
+     * Simple implementation only for testing purposes
+     */
+    private static class MockDBCPService extends AbstractControllerService 
implements HiveDBCPService {
+        private final String dbLocation;
+
+        MockDBCPService(final String dbLocation) {
+            this.dbLocation = dbLocation;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                return DriverManager.getConnection("jdbc:derby:" + dbLocation 
+ ";create=true");
+            } catch (final Exception e) {
+                e.printStackTrace();
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+
+        @Override
+        public String getConnectionURL() {
+            return "jdbc:derby:" + dbLocation + ";create=true";
+        }
+    }
+
+    /**
+     * Simple implementation only for testing purposes
+     */
+    private static class SQLExceptionService extends AbstractControllerService 
implements HiveDBCPService {
+        private final HiveDBCPService service;
+        private int allowedBeforeFailure = 0;
+        private int successful = 0;
+
+        SQLExceptionService(final HiveDBCPService service) {
+            this.service = service;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                if (++successful > allowedBeforeFailure) {
+                    final Connection conn = Mockito.mock(Connection.class);
+                    
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new 
SQLException("Unit Test Generated SQLException"));
+                    return conn;
+                } else {
+                    return service.getConnection();
+                }
+            } catch (final Exception e) {
+                e.printStackTrace();
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+
+        @Override
+        public String getConnectionURL() {
+            return service.getConnectionURL();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
new file mode 100644
index 0000000..5386030
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.dbcp.hive.HiveDBCPService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static 
org.apache.nifi.processors.hive.SelectHiveQL.HIVEQL_OUTPUT_FORMAT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSelectHiveQL {
+
+    private static final Logger LOGGER;
+
+    static {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hive.SelectHiveQL",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hive.TestSelectHiveQL",
 "debug");
+        LOGGER = LoggerFactory.getLogger(TestSelectHiveQL.class);
+    }
+
+    final static String DB_LOCATION = "target/db";
+
+    final static String QUERY_WITH_EL = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+            + " from persons PER"
+            + " where PER.ID > ${person.id}";
+
+    final static String QUERY_WITHOUT_EL = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+            + " from persons PER"
+            + " where PER.ID > 10";
+
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        runner = TestRunners.newTestRunner(SelectHiveQL.class);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(SelectHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+    }
+
+    @Test
+    public void testIncomingConnectionWithNoFlowFile() throws 
InitializationException {
+        runner.setIncomingConnection(true);
+        runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM 
persons");
+        runner.run();
+        runner.assertTransferCount(SelectHiveQL.REL_SUCCESS, 0);
+        runner.assertTransferCount(SelectHiveQL.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testNoIncomingConnection() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+        runner.setIncomingConnection(false);
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro");
+    }
+
+    @Test
+    public void testNoTimeLimit() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+        invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
+    }
+
+
+    @Test
+    public void testWithNullIntColumn() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((HiveDBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, 
NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 
1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(0).assertAttributeEquals(SelectHiveQL.RESULT_ROW_COUNT,
 "2");
+    }
+
+    @Test
+    public void testWithSqlException() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((HiveDBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NO_ROWS");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+        runner.setIncomingConnection(false);
+        // Try a valid SQL statment that will generate an error (val1 does not 
exist, e.g.)
+        runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT val1 FROM 
TEST_NO_ROWS");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
+    }
+
+
+    @Test
+    public void invokeOnTriggerWithCsv()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.CSV);
+    }
+
+    @Test
+    public void invokeOnTriggerWithAvro()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.AVRO);
+    }
+
+    public void invokeOnTrigger(final String query, final boolean 
incomingFlowFile, String outputFormat)
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((HiveDBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        final Statement stmt = con.createStatement();
+        try {
+            stmt.execute("drop table persons");
+        } catch (final SQLException sqle) {
+            // Nothing to do here, the table didn't exist
+        }
+
+        stmt.execute("create table persons (id integer, name varchar(100), 
code integer)");
+        Random rng = new Random(53496);
+        final int nrOfRows = 100;
+        stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + 
rng.nextInt(469947) + ")");
+        for (int i = 2; i <= nrOfRows; i++) {
+            stmt.executeUpdate("insert into persons values (" + i + ", 
'Someone Else', " + rng.nextInt(469947) + ")");
+        }
+        LOGGER.info("test data loaded");
+
+        runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, query);
+        runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
+
+        if (incomingFlowFile) {
+            // incoming FlowFile content is not used, but attributes are used
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put("person.id", "10");
+            runner.enqueue("Hello".getBytes(), attributes);
+        }
+
+        runner.setIncomingConnection(incomingFlowFile);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS);
+        MockFlowFile flowFile = flowfiles.get(0);
+        final InputStream in = new 
ByteArrayInputStream(flowFile.toByteArray());
+        long recordsFromStream = 0;
+        if (SelectHiveQL.AVRO.equals(outputFormat)) {
+            assertEquals(SelectHiveQL.AVRO_MIME_TYPE, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+            final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+            try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+                GenericRecord record = null;
+                while (dataFileReader.hasNext()) {
+                    // Reuse record object by passing it to next(). This saves 
us from
+                    // allocating and garbage collecting many objects for 
files with
+                    // many items.
+                    record = dataFileReader.next(record);
+                    recordsFromStream++;
+                }
+            }
+        } else {
+            assertEquals(SelectHiveQL.CSV_MIME_TYPE, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+            BufferedReader br = new BufferedReader(new InputStreamReader(in));
+
+            String headerRow = br.readLine();
+            // Derby capitalizes column names
+            assertEquals("PERSONID,PERSONNAME,PERSONCODE", headerRow);
+
+            // Validate rows
+            String line;
+            while ((line = br.readLine()) != null) {
+                recordsFromStream++;
+                String[] values = line.split(",");
+                assertEquals(3, values.length);
+                // Assert the name has been quoted
+                assertTrue(values[1].startsWith("\""));
+                assertTrue(values[1].endsWith("\""));
+            }
+        }
+        assertEquals(nrOfRows - 10, recordsFromStream);
+        assertEquals(recordsFromStream, 
Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT)));
+    }
+
+    /**
+     * Simple implementation only for SelectHiveQL processor testing.
+     */
+    private class DBCPServiceSimpleImpl extends AbstractControllerService 
implements HiveDBCPService {
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                final Connection con = 
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+                return con;
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+
+        @Override
+        public String getConnectionURL() {
+            return "jdbc:derby:" + DB_LOCATION + ";create=true";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/pom.xml
new file mode 100644
index 0000000..fc7de98
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hive-bundle</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-hive-processors</module>
+        <module>nifi-hive-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 7ed2da8..08a5c3d 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -59,7 +59,8 @@
         <module>nifi-jms-bundle</module>
         <module>nifi-cassandra-bundle</module>
         <module>nifi-spring-bundle</module>
-    </modules>
+        <module>nifi-hive-bundle</module>
+  </modules>
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -144,4 +145,4 @@
             </dependency>
         </dependencies>
     </dependencyManagement>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9acd949..aca6b02 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1122,6 +1122,12 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-hive-nar</artifactId>
+                <version>1.0.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>
                 <version>1.0.0-SNAPSHOT</version>
             </dependency>

Reply via email to