NIFI-4773 - Fixed column type map initialization in QueryDatabaseTable Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
This closes #2504. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b0142dcf Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b0142dcf Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b0142dcf Branch: refs/heads/HDF-3.1-maint Commit: b0142dcf987734956ba442f1c55ee1c32152c6fb Parents: 1bed849 Author: Matthew Burgess <mattyb...@apache.org> Authored: Fri Mar 2 11:19:43 2018 -0500 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Mon Apr 16 11:20:06 2018 -0400 ---------------------------------------------------------------------- .../AbstractDatabaseFetchProcessor.java | 9 +- .../processors/standard/QueryDatabaseTable.java | 7 ++ .../standard/QueryDatabaseTableTest.java | 79 ++++++++++++++++ .../standard/TestGenerateTableFetch.java | 96 ++++++++++++++++++++ 4 files changed, 183 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b0142dcf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 2145929..11a7685 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -227,14 +227,6 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact return super.customValidate(validationContext); } - @Override - public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - // If the max-value columns have changed, we need to re-fetch the column info from the DB - if (MAX_VALUE_COLUMN_NAMES.equals(descriptor) && newValue != null && !newValue.equals(oldValue)) { - setupComplete.set(false); - } - } - public void setup(final ProcessContext context) { setup(context,true,null); } @@ -246,6 +238,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact // If there are no max-value column names specified, we don't need to perform this processing if (StringUtils.isEmpty(maxValueColumnNames)) { + setupComplete.set(true); return; } http://git-wip-us.apache.org/repos/asf/nifi/blob/b0142dcf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index ffe5845..2783d3c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -28,6 +28,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; @@ -180,6 +181,12 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); } + @OnStopped + public void stop() { + // Reset the column type map in case properties change + setupComplete.set(false); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { // Fetch the column/table info once http://git-wip-us.apache.org/repos/asf/nifi/blob/b0142dcf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 8a2319b..9b0bc99 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -308,6 +308,85 @@ public class QueryDatabaseTableTest { } @Test + public void testAddedRowsTwoTables() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME)); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); + assertEquals(2, getNumberOfRecordsFromStream(in)); + + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Populate a second table and set + stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE2"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE2", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME)); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(3, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a higher ID and run, one flowfile with one new row should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals(flowFile.getAttribute("maxvalue.id"), "3"); + in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + + // Sanity check - run again, this time no flowfiles/rows should be transferred + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + } + + @Test public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException { // load test data to database http://git-wip-us.apache.org/repos/asf/nifi/blob/b0142dcf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 67a9bad..117b47e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -219,6 +219,102 @@ public class TestGenerateTableFetch { } @Test + public void testAddedRowsTwoTables() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + String query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query); + ResultSet resultSet = stmt.executeQuery(query); + // Should be three records + assertTrue(resultSet.next()); + assertTrue(resultSet.next()); + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0); + runner.clearTransferState(); + + // Create and populate a new table and re-run + stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE2"); + + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query); + resultSet = stmt.executeQuery(query); + // Should be three records + assertTrue(resultSet.next()); + assertTrue(resultSet.next()); + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + + // Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')"); + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2"); + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2); + + // Verify first flow file's contents + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID > 2 AND ID <= 5 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query); + resultSet = stmt.executeQuery(query); + // Should be two records + assertTrue(resultSet.next()); + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + + // Verify second flow file's contents + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1); + query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID > 2 AND ID <= 5 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query); + resultSet = stmt.executeQuery(query); + // Should be one record + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + } + + @Test public void testAddedRowsRightBounded() throws ClassNotFoundException, SQLException, InitializationException, IOException { // load test data to database