Repository: nifi
Updated Branches:
  refs/heads/master b7272e3f3 -> 82ac81553


NIFI-1706: Extend QueryDatabaseTable to support arbitrary queries

- Only include Maximum Value columns in the type map.
- Squashed commits in the previous PR
- Rebased against the latest master
- Added stop method to GenerateTableFetch so that it refreshes the
column type map when it gets restarted
- Fixed whitespacing around if/for statement
- Updated expressionLanguageSupported value since it is not auto-merged
correctly

This closes #2618.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/82ac8155
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/82ac8155
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/82ac8155

Branch: refs/heads/master
Commit: 82ac815536d53c00f848f2eae79474035a9eb126
Parents: b7272e3
Author: patricker <patric...@gmail.com>
Authored: Tue Sep 19 13:50:06 2017 +0800
Committer: Koji Kawamura <ijokaruma...@apache.org>
Committed: Fri Apr 13 16:30:13 2018 +0900

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         |  52 +++++-
 .../processors/standard/GenerateTableFetch.java |   7 +
 .../processors/standard/QueryDatabaseTable.java |  29 ++-
 .../standard/QueryDatabaseTableTest.java        | 183 +++++++++++++++++++
 4 files changed, 264 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/82ac8155/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 8c22fed..c7bad42 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -44,6 +44,7 @@ import java.text.DecimalFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
@@ -152,12 +153,22 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
     public static final PropertyDescriptor WHERE_CLAUSE = new 
PropertyDescriptor.Builder()
             .name("db-fetch-where-clause")
             .displayName("Additional WHERE clause")
-            .description("A custom clause to be added in the WHERE condition 
when generating SQL requests.")
+            .description("A custom clause to be added in the WHERE condition 
when building SQL queries.")
             .required(false)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor SQL_QUERY = new 
PropertyDescriptor.Builder()
+            .name("db-fetch-sql-query")
+            .displayName("Custom Query")
+            .description("A custom SQL query used to retrieve data. Instead of 
building a SQL query from "
+                    + "other properties, this query will be wrapped as a 
sub-query. Query must have no ORDER BY statement.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     protected List<PropertyDescriptor> propDescriptors;
 
     // The delimiter to use when referencing qualified names (such as 
table@!@column in the state map)
@@ -246,6 +257,7 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
             // Try to fill the columnTypeMap with the types of the desired 
max-value columns
             final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
             final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String sqlQuery = 
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
 
             final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
             try (final Connection con = dbcpService.getConnection();
@@ -254,7 +266,17 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                 // Try a query that returns no rows, for the purposes of 
getting metadata about the columns. It is possible
                 // to use DatabaseMetaData.getColumns(), but not all drivers 
support this, notably the schema-on-read
                 // approach as in Apache Drill
-                String query = dbAdapter.getSelectStatement(tableName, 
maxValueColumnNames, "1 = 0", null, null, null);
+                String query;
+
+                if (StringUtils.isEmpty(sqlQuery)) {
+                    query = dbAdapter.getSelectStatement(tableName, 
maxValueColumnNames, "1 = 0", null, null, null);
+                } else {
+                    StringBuilder sbQuery = getWrappedQuery(sqlQuery, 
tableName);
+                    sbQuery.append(" WHERE 1=0");
+
+                    query = sbQuery.toString();
+                }
+
                 ResultSet resultSet = st.executeQuery(query);
                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
                 int numCols = resultSetMetaData.getColumnCount();
@@ -262,12 +284,34 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                     if (shouldCleanCache) {
                         columnTypeMap.clear();
                     }
+
+                    final List<String> maxValueColumnNameList = 
Arrays.asList(maxValueColumnNames.toLowerCase().split(","));
+                    final List<String> maxValueQualifiedColumnNameList = new 
ArrayList<>();
+
+                    for (String maxValueColumn:maxValueColumnNameList) {
+                        String colKey = getStateKey(tableName, 
maxValueColumn.trim());
+                        maxValueQualifiedColumnNameList.add(colKey);
+                    }
+
                     for (int i = 1; i <= numCols; i++) {
                         String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
                         String colKey = getStateKey(tableName, colName);
+
+                        //only include columns that are part of the maximum 
value tracking column list
+                        if (!maxValueQualifiedColumnNameList.contains(colKey)) 
{
+                            continue;
+                        }
+
                         int colType = resultSetMetaData.getColumnType(i);
                         columnTypeMap.putIfAbsent(colKey, colType);
                     }
+
+                    for (String maxValueColumn:maxValueColumnNameList) {
+                        String colKey = getStateKey(tableName, 
maxValueColumn.trim().toLowerCase());
+                        if (!columnTypeMap.containsKey(colKey)) {
+                            throw new ProcessException("Column not found in 
the table/query specified: " + maxValueColumn);
+                        }
+                    }
                 } else {
                     throw new ProcessException("No columns found in table from 
those specified: " + maxValueColumnNames);
                 }
@@ -279,6 +323,10 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
         }
     }
 
+    protected static StringBuilder getWrappedQuery(String sqlQuery, String 
tableName){
+       return new StringBuilder("SELECT * FROM (" + sqlQuery + ") AS " + 
tableName);
+    }
+
     protected static String getMaxValueFromRow(ResultSet resultSet,
                                                int columnIndex,
                                                Integer type,

http://git-wip-us.apache.org/repos/asf/nifi/blob/82ac8155/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index d126de1..e041bed 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -28,6 +28,7 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -162,6 +163,12 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
         }
     }
 
+    @OnStopped
+    public void stop() {
+        // Reset the column type map in case properties change
+        setupComplete.set(false);
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
         // Fetch the column/table info once (if the table name and max value 
columns are not dynamic). Otherwise do the setup later

http://git-wip-us.apache.org/repos/asf/nifi/blob/82ac8155/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index fe1e4ef..254c9cb 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -79,7 +79,8 @@ import static 
org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"sql", "select", "jdbc", "query", "database"})
 @SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
-@CapabilityDescription("Generates and executes a SQL select query to fetch all 
rows whose values in the specified Maximum Value column(s) are larger than the "
+@CapabilityDescription("Generates a SQL select query, or uses a provided 
statement, and executes it to fetch all rows whose values in the specified "
+        + "Maximum Value column(s) are larger than the "
         + "previously-seen maxima. Query result will be converted to Avro 
format. Expression Language is supported for several properties, but no 
incoming "
         + "connections are permitted. The Variable Registry may be used to 
provide values for any property containing Expression Language. If it is 
desired to "
         + "leverage flow file attributes to perform these queries, the 
GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. "
@@ -168,8 +169,13 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         final List<PropertyDescriptor> pds = new ArrayList<>();
         pds.add(DBCP_SERVICE);
         pds.add(DB_TYPE);
-        pds.add(TABLE_NAME);
+        pds.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(TABLE_NAME)
+                .description("The name of the database table to be queried. 
When a custom query is used, this property is used to alias the query and 
appears as an attribute on the FlowFile.")
+                .build());
         pds.add(COLUMN_NAMES);
+        pds.add(WHERE_CLAUSE);
+        pds.add(SQL_QUERY);
         pds.add(MAX_VALUE_COLUMN_NAMES);
         pds.add(QUERY_TIMEOUT);
         pds.add(FETCH_SIZE);
@@ -180,7 +186,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         pds.add(USE_AVRO_LOGICAL_TYPES);
         pds.add(DEFAULT_PRECISION);
         pds.add(DEFAULT_SCALE);
-        pds.add(WHERE_CLAUSE);
+
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -220,6 +226,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
         final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
         final String columnNames = 
context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final String sqlQuery = 
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
         final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
         final String customWhereClause = 
context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
         final Integer fetchSize = 
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
@@ -275,7 +282,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         List<String> maxValueColumnNameList = 
StringUtils.isEmpty(maxValueColumnNames)
                 ? null
                 : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
-        final String selectQuery = getQuery(dbAdapter, tableName, columnNames, 
maxValueColumnNameList, customWhereClause, statePropertyMap);
+        final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, 
columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
         final StopWatch stopWatch = new StopWatch(true);
         final String fragmentIdentifier = UUID.randomUUID().toString();
 
@@ -404,10 +411,22 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
 
     protected String getQuery(DatabaseAdapter dbAdapter, String tableName, 
String columnNames, List<String> maxValColumnNames,
                               String customWhereClause, Map<String, String> 
stateMap) {
+
+        return getQuery(dbAdapter, tableName, null, columnNames, 
maxValColumnNames, customWhereClause, stateMap);
+    }
+
+    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, 
String sqlQuery, String columnNames, List<String> maxValColumnNames,
+                              String customWhereClause, Map<String, String> 
stateMap) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name must be specified");
         }
-        final StringBuilder query = new 
StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, 
null, null));
+        final StringBuilder query;
+
+        if (StringUtils.isEmpty(sqlQuery)) {
+            query = new StringBuilder(dbAdapter.getSelectStatement(tableName, 
columnNames, null, null, null, null));
+        } else {
+            query = getWrappedQuery(sqlQuery, tableName);
+        }
 
         List<String> whereClauses = new ArrayList<>();
         // Check state map for last max values

http://git-wip-us.apache.org/repos/asf/nifi/blob/82ac8155/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index a1f2fb5..510156f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -986,6 +986,189 @@ public class QueryDatabaseTableTest {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testCustomSQL() throws SQLException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        try {
+            stmt.execute("drop table TYPE_LIST");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
type varchar(20), name varchar(100), scale float, created_on timestamp, bignum 
bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 
03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        stmt.execute("create table TYPE_LIST (type_id integer not null, type 
varchar(20), descr varchar(255))");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 
'male', 'Man')");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 
'female', 'Woman')");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, 
'', 'Unspecified')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setProperty(QueryDatabaseTable.SQL_QUERY, "SELECT id, b.type as 
gender, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a 
INNER JOIN TYPE_LIST b ON (a.type=b.type)");
+        runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "gender = 'male'");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", 
flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+        InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
+        runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        //Remove Max Rows Per Flow File
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
+
+        // Add a new row with a higher ID and run, one flowfile with one new 
row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 
03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        // Sanity check - run again, this time no flowfiles/rows should be 
transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        // Add timestamp as a max value column name
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, 
created_on");
+
+        // Add a new row with a higher ID and run, one flow file will be 
transferred because no max value for the timestamp has been stored
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 
03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+        assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 
03:23:34.234");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID but lower timestamp and run, no flow 
file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flow file will be 
transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set name as the max value column name (and clear the state), all 
rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(4, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a "higher" name than the max but lower than 
"NULL" (to test that null values are skipped), one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all 
rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(5, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow 
file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all 
rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, 
"bignum");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(6, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow 
file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 
03:23:34.234', 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testMissingColumn() throws ProcessException, 
ClassNotFoundException, SQLException, InitializationException, IOException {
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        try {
+            stmt.execute("drop table TYPE_LIST");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
type varchar(20), name varchar(100), scale float, created_on timestamp, bignum 
bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 
03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, 
created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        stmt.execute("create table TYPE_LIST (type_id integer not null, type 
varchar(20), descr varchar(255))");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 
'male', 'Man')");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 
'female', 'Woman')");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, 
'', 'Unspecified')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TYPE_LIST");
+        runner.setProperty(QueryDatabaseTable.SQL_QUERY, "SELECT b.type, 
b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN 
TYPE_LIST b ON (a.type=b.type)");
+        runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "type = 'male'");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2");
+
+        runner.run();
+    }
+
     private long getNumberOfRecordsFromStream(InputStream in) throws 
IOException {
         final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
         try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {

Reply via email to