Repository: nifi Updated Branches: refs/heads/master 833601406 -> 106b0fa0f
http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java new file mode 100644 index 0000000..c048c02 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.hive; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaBuilder.FieldAssembler; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +/** + * JDBC / HiveQL common functions. + */ +public class HiveJdbcCommon { + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { + return convertToAvroStream(rs, outStream, null, null); + } + + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) + throws SQLException, IOException { + final Schema schema = createSchema(rs, recordName); + final GenericRecord rec = new GenericData.Record(schema); + + final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(schema, outStream); + + final ResultSetMetaData meta = rs.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + long nrOfRows = 0; + while (rs.next()) { + if (callback != null) { + callback.processRow(rs); + } + for (int i = 1; i <= nrOfColumns; i++) { + final int javaSqlType = meta.getColumnType(i); + final Object value = rs.getObject(i); + + if (value == null) { + rec.put(i - 1, null); + + } else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || javaSqlType == BLOB || javaSqlType == CLOB) { + // bytes requires little bit different handling + byte[] bytes = rs.getBytes(i); + ByteBuffer bb = ByteBuffer.wrap(bytes); + rec.put(i - 1, bb); + + } else if (value instanceof Byte) { + // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT + // But value is returned by JDBC as java.lang.Byte + // (at least H2 JDBC works this way) + // direct put to avro record results: + // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte + rec.put(i - 1, ((Byte) value).intValue()); + + } else if (value instanceof BigDecimal || value instanceof BigInteger) { + // Avro can't handle BigDecimal and BigInteger as numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38" + rec.put(i - 1, value.toString()); + + } else if (value instanceof Number || value instanceof Boolean) { + rec.put(i - 1, value); + + } else { + // The different types that we support are numbers (int, long, double, float), + // as well as boolean values and Strings. Since Avro doesn't provide + // timestamp types, we want to convert those to Strings. So we will cast anything other + // than numbers or booleans to strings by using the toString() method. + rec.put(i - 1, value.toString()); + } + } + dataFileWriter.append(rec); + nrOfRows += 1; + } + + return nrOfRows; + } + } + + public static Schema createSchema(final ResultSet rs) throws SQLException { + return createSchema(rs, null); + } + + /** + * Creates an Avro schema from a result set. If the table/record name is known a priori and provided, use that as a + * fallback for the record name if it cannot be retrieved from the result set, and finally fall back to a default value. + * + * @param rs The result set to convert to Avro + * @param recordName The a priori record name to use if it cannot be determined from the result set. + * @return A Schema object representing the result set converted to an Avro record + * @throws SQLException if any error occurs during conversion + */ + public static Schema createSchema(final ResultSet rs, String recordName) throws SQLException { + final ResultSetMetaData meta = rs.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + String tableName = StringUtils.isEmpty(recordName) ? "NiFi_SelectHiveQL_Record" : recordName; + try { + if (nrOfColumns > 0) { + // Hive JDBC doesn't support getTableName, instead it returns table.column for column name. Grab the table name from the first column + String firstColumnNameFromMeta = meta.getColumnName(1); + int tableNameDelimiter = firstColumnNameFromMeta.lastIndexOf("."); + if (tableNameDelimiter > -1) { + String tableNameFromMeta = firstColumnNameFromMeta.substring(0, tableNameDelimiter); + if (!StringUtils.isBlank(tableNameFromMeta)) { + tableName = tableNameFromMeta; + } + } + } + } catch (SQLException se) { + // Not all drivers support getTableName, so just use the previously-set default + } + + final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields(); + + /** + * Some missing Avro types - Decimal, Date types. May need some additional work. + */ + for (int i = 1; i <= nrOfColumns; i++) { + String columnNameFromMeta = meta.getColumnName(i); + // Hive returns table.column for column name. Grab the column name as the string after the last period + int columnNameDelimiter = columnNameFromMeta.lastIndexOf("."); + String columnName = columnNameFromMeta.substring(columnNameDelimiter + 1); + switch (meta.getColumnType(i)) { + case CHAR: + case LONGNVARCHAR: + case LONGVARCHAR: + case NCHAR: + case NVARCHAR: + case VARCHAR: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + break; + + case BIT: + case BOOLEAN: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault(); + break; + + case INTEGER: + // Default to signed type unless otherwise noted. Some JDBC drivers don't implement isSigned() + boolean signedType = true; + try { + signedType = meta.isSigned(i); + } catch (SQLException se) { + // Use signed types as default + } + if (signedType) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); + } else { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); + } + break; + + case SMALLINT: + case TINYINT: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); + break; + + case BIGINT: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); + break; + + // java.sql.RowId is interface, is seems to be database + // implementation specific, let's convert to String + case ROWID: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + break; + + case FLOAT: + case REAL: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault(); + break; + + case DOUBLE: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault(); + break; + + // Did not find direct suitable type, need to be clarified!!!! + case DECIMAL: + case NUMERIC: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + break; + + // Did not find direct suitable type, need to be clarified!!!! + case DATE: + case TIME: + case TIMESTAMP: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + break; + + case BINARY: + case VARBINARY: + case LONGVARBINARY: + case ARRAY: + case BLOB: + case CLOB: + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault(); + break; + + + default: + throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " cannot be converted to Avro type"); + } + } + + return builder.endRecord(); + } + + public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { + return convertToCsvStream(rs, outStream, null, null); + } + + public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) + throws SQLException, IOException { + + final ResultSetMetaData meta = rs.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + List<String> columnNames = new ArrayList<>(nrOfColumns); + + for (int i = 1; i <= nrOfColumns; i++) { + String columnNameFromMeta = meta.getColumnName(i); + // Hive returns table.column for column name. Grab the column name as the string after the last period + int columnNameDelimiter = columnNameFromMeta.lastIndexOf("."); + columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1)); + } + + // Write column names as header row + outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8)); + outStream.write("\n".getBytes(StandardCharsets.UTF_8)); + + // Iterate over the rows + long nrOfRows = 0; + while (rs.next()) { + if (callback != null) { + callback.processRow(rs); + } + List<String> rowValues = new ArrayList<>(nrOfColumns); + for (int i = 1; i <= nrOfColumns; i++) { + final int javaSqlType = meta.getColumnType(i); + final Object value = rs.getObject(i); + + switch (javaSqlType) { + case CHAR: + case LONGNVARCHAR: + case LONGVARCHAR: + case NCHAR: + case NVARCHAR: + case VARCHAR: + rowValues.add("\"" + StringEscapeUtils.escapeCsv(rs.getString(i)) + "\""); + break; + default: + rowValues.add(value.toString()); + } + } + // Write row values + outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8)); + outStream.write("\n".getBytes(StandardCharsets.UTF_8)); + nrOfRows++; + } + return nrOfRows; + } + + /** + * An interface for callback methods which allows processing of a row during the convertToXYZStream() processing. + * <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference. + * Advancing the cursor (e.g.) can cause rows to be skipped during Avro transformation. + */ + public interface ResultSetRowCallback { + void processRow(ResultSet resultSet) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..6a2936e --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.dbcp.hive.HiveConnectionPool \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..b218214 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.hive.SelectHiveQL +org.apache.nifi.processors.hive.PutHiveQL http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java new file mode 100644 index 0000000..b46b847 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java @@ -0,0 +1,560 @@ +package org.apache.nifi.processors.hive;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.dbcp.hive.HiveDBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestPutHiveQL { + private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + private static final String createPersonsAutoId = "CREATE TABLE PERSONS (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes()); + runner.run(); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testFailInMiddleWithBadStatement() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + runner.run(); + + runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1); + runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3); + } + + + @Test + public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("hiveql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("hiveql.args.1.type", String.valueOf(Types.VARCHAR)); + badAttributes.put("hiveql.args.1.value", "hello"); + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1); + runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3); + } + + + @Test + public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("hiveql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + badAttributes.put("hiveql.args.1.value", "9999"); + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3); + runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertTrue(rs.next()); + assertTrue(rs.next()); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE PERSONS (id integer primary key, name varchar(100), code bigint)"); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", "-5"); + attributes.put("hiveql.args.1.value", "84"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.clearTransferState(); + + attributes.clear(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.1.value", "George"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.2.value", "1"); + + runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(0, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final DBCPService service = new SQLExceptionService(null); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1); + } + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements HiveDBCPService { + private final String dbLocation; + + MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + return DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + + @Override + public String getConnectionURL() { + return "jdbc:derby:" + dbLocation + ";create=true"; + } + } + + /** + * Simple implementation only for testing purposes + */ + private static class SQLExceptionService extends AbstractControllerService implements HiveDBCPService { + private final HiveDBCPService service; + private int allowedBeforeFailure = 0; + private int successful = 0; + + SQLExceptionService(final HiveDBCPService service) { + this.service = service; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + if (++successful > allowedBeforeFailure) { + final Connection conn = Mockito.mock(Connection.class); + Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException")); + return conn; + } else { + return service.getConnection(); + } + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + + @Override + public String getConnectionURL() { + return service.getConnectionURL(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java new file mode 100644 index 0000000..5386030 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.dbcp.hive.HiveDBCPService; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.apache.nifi.processors.hive.SelectHiveQL.HIVEQL_OUTPUT_FORMAT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSelectHiveQL { + + private static final Logger LOGGER; + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hive.SelectHiveQL", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hive.TestSelectHiveQL", "debug"); + LOGGER = LoggerFactory.getLogger(TestSelectHiveQL.class); + } + + final static String DB_LOCATION = "target/db"; + + final static String QUERY_WITH_EL = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + " from persons PER" + + " where PER.ID > ${person.id}"; + + final static String QUERY_WITHOUT_EL = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + " from persons PER" + + " where PER.ID > 10"; + + + @BeforeClass + public static void setupClass() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map<String, String> dbcpProperties = new HashMap<>(); + + runner = TestRunners.newTestRunner(SelectHiveQL.class); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(SelectHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + } + + @Test + public void testIncomingConnectionWithNoFlowFile() throws InitializationException { + runner.setIncomingConnection(true); + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM persons"); + runner.run(); + runner.assertTransferCount(SelectHiveQL.REL_SUCCESS, 0); + runner.assertTransferCount(SelectHiveQL.REL_FAILURE, 0); + } + + @Test + public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { + runner.setIncomingConnection(false); + invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro"); + } + + @Test + public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITH_EL, true, "Avro"); + } + + + @Test + public void testWithNullIntColumn() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((HiveDBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setIncomingConnection(false); + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(0).assertAttributeEquals(SelectHiveQL.RESULT_ROW_COUNT, "2"); + } + + @Test + public void testWithSqlException() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((HiveDBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NO_ROWS"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NO_ROWS (id integer)"); + + runner.setIncomingConnection(false); + // Try a valid SQL statment that will generate an error (val1 does not exist, e.g.) + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS"); + runner.run(); + + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + } + + + @Test + public void invokeOnTriggerWithCsv() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.CSV); + } + + @Test + public void invokeOnTriggerWithAvro() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.AVRO); + } + + public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat) + throws InitializationException, ClassNotFoundException, SQLException, IOException { + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((HiveDBCPService) runner.getControllerService("dbcp")).getConnection(); + final Statement stmt = con.createStatement(); + try { + stmt.execute("drop table persons"); + } catch (final SQLException sqle) { + // Nothing to do here, the table didn't exist + } + + stmt.execute("create table persons (id integer, name varchar(100), code integer)"); + Random rng = new Random(53496); + final int nrOfRows = 100; + stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")"); + for (int i = 2; i <= nrOfRows; i++) { + stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")"); + } + LOGGER.info("test data loaded"); + + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, query); + runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat); + + if (incomingFlowFile) { + // incoming FlowFile content is not used, but attributes are used + final Map<String, String> attributes = new HashMap<>(); + attributes.put("person.id", "10"); + runner.enqueue("Hello".getBytes(), attributes); + } + + runner.setIncomingConnection(incomingFlowFile); + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1); + + final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS); + MockFlowFile flowFile = flowfiles.get(0); + final InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + long recordsFromStream = 0; + if (SelectHiveQL.AVRO.equals(outputFormat)) { + assertEquals(SelectHiveQL.AVRO_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + recordsFromStream++; + } + } + } else { + assertEquals(SelectHiveQL.CSV_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + + String headerRow = br.readLine(); + // Derby capitalizes column names + assertEquals("PERSONID,PERSONNAME,PERSONCODE", headerRow); + + // Validate rows + String line; + while ((line = br.readLine()) != null) { + recordsFromStream++; + String[] values = line.split(","); + assertEquals(3, values.length); + // Assert the name has been quoted + assertTrue(values[1].startsWith("\"")); + assertTrue(values[1].endsWith("\"")); + } + } + assertEquals(nrOfRows - 10, recordsFromStream); + assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT))); + } + + /** + * Simple implementation only for SelectHiveQL processor testing. + */ + private class DBCPServiceSimpleImpl extends AbstractControllerService implements HiveDBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + + @Override + public String getConnectionURL() { + return "jdbc:derby:" + DB_LOCATION + ";create=true"; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/nifi-hive-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/pom.xml new file mode 100644 index 0000000..fc7de98 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-hive-bundle</artifactId> + <version>1.0.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-hive-processors</module> + <module>nifi-hive-nar</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 7ed2da8..08a5c3d 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -59,7 +59,8 @@ <module>nifi-jms-bundle</module> <module>nifi-cassandra-bundle</module> <module>nifi-spring-bundle</module> - </modules> + <module>nifi-hive-bundle</module> + </modules> <dependencyManagement> <dependencies> <dependency> @@ -144,4 +145,4 @@ </dependency> </dependencies> </dependencyManagement> -</project> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/106b0fa0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9acd949..aca6b02 100644 --- a/pom.xml +++ b/pom.xml @@ -1122,6 +1122,12 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hive-nar</artifactId> + <version>1.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-properties</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>