NIFI-4773 - Fixed column type map initialization in QueryDatabaseTable

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>

This closes #2504.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b0142dcf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b0142dcf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b0142dcf

Branch: refs/heads/HDF-3.1-maint
Commit: b0142dcf987734956ba442f1c55ee1c32152c6fb
Parents: 1bed849
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Fri Mar 2 11:19:43 2018 -0500
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Mon Apr 16 11:20:06 2018 -0400

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         |  9 +-
 .../processors/standard/QueryDatabaseTable.java |  7 ++
 .../standard/QueryDatabaseTableTest.java        | 79 ++++++++++++++++
 .../standard/TestGenerateTableFetch.java        | 96 ++++++++++++++++++++
 4 files changed, 183 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b0142dcf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 2145929..11a7685 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -227,14 +227,6 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
         return super.customValidate(validationContext);
     }
 
-    @Override
-    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-        // If the max-value columns have changed, we need to re-fetch the 
column info from the DB
-        if (MAX_VALUE_COLUMN_NAMES.equals(descriptor) && newValue != null && 
!newValue.equals(oldValue)) {
-            setupComplete.set(false);
-        }
-    }
-
     public void setup(final ProcessContext context) {
         setup(context,true,null);
     }
@@ -246,6 +238,7 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
 
             // If there are no max-value column names specified, we don't need 
to perform this processing
             if (StringUtils.isEmpty(maxValueColumnNames)) {
+                setupComplete.set(true);
                 return;
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0142dcf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index ffe5845..2783d3c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -28,6 +28,7 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -180,6 +181,12 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         maxValueProperties = 
getDefaultMaxValueProperties(context.getProperties());
     }
 
+    @OnStopped
+    public void stop() {
+        // Reset the column type map in case properties change
+        setupComplete.set(false);
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
         // Fetch the column/table info once

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0142dcf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 8a2319b..9b0bc99 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -308,6 +308,85 @@ public class QueryDatabaseTableTest {
     }
 
     @Test
+    public void testAddedRowsTwoTables() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE2");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
2);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", 
flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
+        runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
+        assertEquals(2, getNumberOfRecordsFromStream(in));
+
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Populate a second table and set
+        stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE2");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE2", 
flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(3, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flowfile with one new 
row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+
+        // Sanity check - run again, this time no flowfiles/rows should be 
transferred
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testMultiplePartitions() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
 
         // load test data to database

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0142dcf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 67a9bad..117b47e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -219,6 +219,102 @@ public class TestGenerateTableFetch {
     }
 
     @Test
+    public void testAddedRowsTwoTables() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE2");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY 
ID FETCH NEXT 10000 ROWS ONLY", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be three records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Create and populate a new table and re-run
+        stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, 
"TEST_QUERY_DB_TABLE2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID <= 2 ORDER 
BY ID FETCH NEXT 10000 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be three records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Add 3 new rows with a higher ID and run with a partition size of 2. 
Two flow files should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, 
created_on) VALUES (5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        // Verify first flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID > 2 AND ID 
<= 5 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be two records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+
+        // Verify second flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID > 2 AND ID 
<= 5 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testAddedRowsRightBounded() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
 
         // load test data to database

Reply via email to