Repository: nifi Updated Branches: refs/heads/master b7272e3f3 -> 82ac81553
NIFI-1706: Extend QueryDatabaseTable to support arbitrary queries - Only include Maximum Value columns in the type map. - Squashed commits in the previous PR - Rebased against the latest master - Added stop method to GenerateTableFetch so that it refreshes the column type map when it gets restarted - Fixed whitespacing around if/for statement - Updated expressionLanguageSupported value since it is not auto-merged correctly This closes #2618. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/82ac8155 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/82ac8155 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/82ac8155 Branch: refs/heads/master Commit: 82ac815536d53c00f848f2eae79474035a9eb126 Parents: b7272e3 Author: patricker <[email protected]> Authored: Tue Sep 19 13:50:06 2017 +0800 Committer: Koji Kawamura <[email protected]> Committed: Fri Apr 13 16:30:13 2018 +0900 ---------------------------------------------------------------------- .../AbstractDatabaseFetchProcessor.java | 52 +++++- .../processors/standard/GenerateTableFetch.java | 7 + .../processors/standard/QueryDatabaseTable.java | 29 ++- .../standard/QueryDatabaseTableTest.java | 183 +++++++++++++++++++ 4 files changed, 264 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/82ac8155/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 8c22fed..c7bad42 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -44,6 +44,7 @@ import java.text.DecimalFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -152,12 +153,22 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact public static final PropertyDescriptor WHERE_CLAUSE = new PropertyDescriptor.Builder() .name("db-fetch-where-clause") .displayName("Additional WHERE clause") - .description("A custom clause to be added in the WHERE condition when generating SQL requests.") + .description("A custom clause to be added in the WHERE condition when building SQL queries.") .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor SQL_QUERY = new PropertyDescriptor.Builder() + .name("db-fetch-sql-query") + .displayName("Custom Query") + .description("A custom SQL query used to retrieve data. Instead of building a SQL query from " + + "other properties, this query will be wrapped as a sub-query. Query must have no ORDER BY statement.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected List<PropertyDescriptor> propDescriptors; // The delimiter to use when referencing qualified names (such as table@!@column in the state map) @@ -246,6 +257,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact // Try to fill the columnTypeMap with the types of the desired max-value columns final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue(); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); try (final Connection con = dbcpService.getConnection(); @@ -254,7 +266,17 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read // approach as in Apache Drill - String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null); + String query; + + if (StringUtils.isEmpty(sqlQuery)) { + query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null); + } else { + StringBuilder sbQuery = getWrappedQuery(sqlQuery, tableName); + sbQuery.append(" WHERE 1=0"); + + query = sbQuery.toString(); + } + ResultSet resultSet = st.executeQuery(query); ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); int numCols = resultSetMetaData.getColumnCount(); @@ -262,12 +284,34 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact if (shouldCleanCache) { columnTypeMap.clear(); } + + final List<String> maxValueColumnNameList = Arrays.asList(maxValueColumnNames.toLowerCase().split(",")); + final List<String> maxValueQualifiedColumnNameList = new ArrayList<>(); + + for (String maxValueColumn:maxValueColumnNameList) { + String colKey = getStateKey(tableName, maxValueColumn.trim()); + maxValueQualifiedColumnNameList.add(colKey); + } + for (int i = 1; i <= numCols; i++) { String colName = resultSetMetaData.getColumnName(i).toLowerCase(); String colKey = getStateKey(tableName, colName); + + //only include columns that are part of the maximum value tracking column list + if (!maxValueQualifiedColumnNameList.contains(colKey)) { + continue; + } + int colType = resultSetMetaData.getColumnType(i); columnTypeMap.putIfAbsent(colKey, colType); } + + for (String maxValueColumn:maxValueColumnNameList) { + String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase()); + if (!columnTypeMap.containsKey(colKey)) { + throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn); + } + } } else { throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames); } @@ -279,6 +323,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact } } + protected static StringBuilder getWrappedQuery(String sqlQuery, String tableName){ + return new StringBuilder("SELECT * FROM (" + sqlQuery + ") AS " + tableName); + } + protected static String getMaxValueFromRow(ResultSet resultSet, int columnIndex, Integer type, http://git-wip-us.apache.org/repos/asf/nifi/blob/82ac8155/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index d126de1..e041bed 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -28,6 +28,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -162,6 +163,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { } } + @OnStopped + public void stop() { + // Reset the column type map in case properties change + setupComplete.set(false); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { // Fetch the column/table info once (if the table name and max value columns are not dynamic). Otherwise do the setup later http://git-wip-us.apache.org/repos/asf/nifi/blob/82ac8155/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index fe1e4ef..254c9cb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -79,7 +79,8 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"sql", "select", "jdbc", "query", "database"}) @SeeAlso({GenerateTableFetch.class, ExecuteSQL.class}) -@CapabilityDescription("Generates and executes a SQL select query to fetch all rows whose values in the specified Maximum Value column(s) are larger than the " +@CapabilityDescription("Generates a SQL select query, or uses a provided statement, and executes it to fetch all rows whose values in the specified " + + "Maximum Value column(s) are larger than the " + "previously-seen maxima. Query result will be converted to Avro format. Expression Language is supported for several properties, but no incoming " + "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to " + "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. " @@ -168,8 +169,13 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final List<PropertyDescriptor> pds = new ArrayList<>(); pds.add(DBCP_SERVICE); pds.add(DB_TYPE); - pds.add(TABLE_NAME); + pds.add(new PropertyDescriptor.Builder() + .fromPropertyDescriptor(TABLE_NAME) + .description("The name of the database table to be queried. When a custom query is used, this property is used to alias the query and appears as an attribute on the FlowFile.") + .build()); pds.add(COLUMN_NAMES); + pds.add(WHERE_CLAUSE); + pds.add(SQL_QUERY); pds.add(MAX_VALUE_COLUMN_NAMES); pds.add(QUERY_TIMEOUT); pds.add(FETCH_SIZE); @@ -180,7 +186,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { pds.add(USE_AVRO_LOGICAL_TYPES); pds.add(DEFAULT_PRECISION); pds.add(DEFAULT_SCALE); - pds.add(WHERE_CLAUSE); + propDescriptors = Collections.unmodifiableList(pds); } @@ -220,6 +226,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue(); final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); @@ -275,7 +282,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) ? null : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); - final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap); + final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap); final StopWatch stopWatch = new StopWatch(true); final String fragmentIdentifier = UUID.randomUUID().toString(); @@ -404,10 +411,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames, String customWhereClause, Map<String, String> stateMap) { + + return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap); + } + + protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames, + String customWhereClause, Map<String, String> stateMap) { if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException("Table name must be specified"); } - final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); + final StringBuilder query; + + if (StringUtils.isEmpty(sqlQuery)) { + query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); + } else { + query = getWrappedQuery(sqlQuery, tableName); + } List<String> whereClauses = new ArrayList<>(); // Check state map for last max values http://git-wip-us.apache.org/repos/asf/nifi/blob/82ac8155/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index a1f2fb5..510156f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -986,6 +986,189 @@ public class QueryDatabaseTableTest { runner.clearTransferState(); } + @Test + public void testCustomSQL() throws SQLException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + try { + stmt.execute("drop table TYPE_LIST"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')"); + + stmt.execute("create table TYPE_LIST (type_id integer not null, type varchar(20), descr varchar(255))"); + stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 'male', 'Man')"); + stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 'female', 'Woman')"); + stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, '', 'Unspecified')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setProperty(QueryDatabaseTable.SQL_QUERY, "SELECT id, b.type as gender, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN TYPE_LIST b ON (a.type=b.type)"); + runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "gender = 'male'"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME)); + assertEquals(flowFile.getAttribute("maxvalue.id"), "0"); + InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); + assertEquals(1, getNumberOfRecordsFromStream(in)); + + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + //Remove Max Rows Per Flow File + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0"); + + // Add a new row with a higher ID and run, one flowfile with one new row should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Sanity check - run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add timestamp as a max value column name + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, created_on"); + + // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals(flowFile.getAttribute("maxvalue.id"), "4"); + assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234"); + in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add a new row with a higher ID and run, one flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set + runner.getStateManager().clear(Scope.CLUSTER); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(4, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set + runner.getStateManager().clear(Scope.CLUSTER); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(5, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a higher value for scale than the max, one flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set + runner.getStateManager().clear(Scope.CLUSTER); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "bignum"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(6, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a higher value for scale than the max, one flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + } + + @Test(expected = AssertionError.class) + public void testMissingColumn() throws ProcessException, ClassNotFoundException, SQLException, InitializationException, IOException { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + try { + stmt.execute("drop table TYPE_LIST"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')"); + + stmt.execute("create table TYPE_LIST (type_id integer not null, type varchar(20), descr varchar(255))"); + stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 'male', 'Man')"); + stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 'female', 'Woman')"); + stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, '', 'Unspecified')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TYPE_LIST"); + runner.setProperty(QueryDatabaseTable.SQL_QUERY, "SELECT b.type, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN TYPE_LIST b ON (a.type=b.type)"); + runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "type = 'male'"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2"); + + runner.run(); + } + private long getNumberOfRecordsFromStream(InputStream in) throws IOException { final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
