Repository: nifi
Updated Branches:
  refs/heads/master ea3c294f9 -> 6d4901cd2


NIFI-2881: Added EL support to DB Fetch processors

- Allow incoming flowfiles to GenerateTableFetch
- Incorporated review comments/discussions
- Updated documentation, added error attribute to GenerateTableFetch
- Corrected notes for column properties in fetch processors

This closes #1407.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6d4901cd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6d4901cd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6d4901cd

Branch: refs/heads/master
Commit: 6d4901cd26388e1810cb670a135d2cf0e87cc8d0
Parents: ea3c294
Author: Matt Burgess <[email protected]>
Authored: Mon Jan 9 13:37:03 2017 -0500
Committer: Koji Kawamura <[email protected]>
Committed: Fri Feb 3 08:51:13 2017 +0900

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         |  64 ++++-
 .../processors/standard/GenerateTableFetch.java | 133 ++++++---
 .../processors/standard/QueryDatabaseTable.java |  79 ++++--
 .../standard/QueryDatabaseTableTest.java        |  19 +-
 .../standard/TestGenerateTableFetch.java        | 267 ++++++++++++++++++-
 5 files changed, 494 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6d4901cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index eda9328..7728af1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -24,6 +26,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -36,6 +39,7 @@ import java.sql.Timestamp;
 import java.text.DecimalFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -97,15 +101,18 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
             .description("The name of the database table to be queried.")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor COLUMN_NAMES = new 
PropertyDescriptor.Builder()
             .name("Columns to Return")
             .description("A comma-separated list of column names to be used in 
the query. If your database requires "
                     + "special treatment of the names (quoting, e.g.), each 
name should include such treatment. If no "
-                    + "column names are supplied, all columns in the specified 
table will be returned.")
+                    + "column names are supplied, all columns in the specified 
table will be returned. NOTE: It is important "
+                    + "to use consistent column names for a given table for 
incremental fetch to work properly.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new 
PropertyDescriptor.Builder()
@@ -117,9 +124,11 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                     + "can be used to retrieve only those rows that have been 
added/updated since the last retrieval. Note that some "
                     + "JDBC types such as bit/boolean are not conducive to 
maintaining maximum value, so columns of these "
                     + "types should not be listed in this property, and will 
result in error(s) during processing. If no columns "
-                    + "are provided, all rows from the table will be 
considered, which could have a performance impact.")
+                    + "are provided, all rows from the table will be 
considered, which could have a performance impact. NOTE: It is important "
+                    + "to use consistent max-value column names for a given 
table for incremental fetch to work properly.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
@@ -129,6 +138,7 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
             .defaultValue("0 seconds")
             .required(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new 
PropertyDescriptor.Builder()
@@ -143,11 +153,24 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
 
     protected List<PropertyDescriptor> propDescriptors;
 
+    // The delimiter to use when referencing qualified names (such as 
table@!@column in the state map)
+    protected static final String NAMESPACE_DELIMITER = "@!@";
+
     public static final PropertyDescriptor DB_TYPE;
 
     protected final static Map<String, DatabaseAdapter> dbAdapters = new 
HashMap<>();
     protected final Map<String, Integer> columnTypeMap = new HashMap<>();
 
+    // This value is set when the processor is scheduled and indicates whether 
the Table Name property contains Expression Language.
+    // It is used for backwards-compatibility purposes; if the value is false 
and the fully-qualified state key (table + column) is not found,
+    // the processor will look for a state key with just the column name.
+    protected volatile boolean isDynamicTableName = false;
+
+    // This value is set when the processor is scheduled and indicates whether 
the Maximum Value Columns property contains Expression Language.
+    // It is used for backwards-compatibility purposes; if the table name and 
max-value columns are static, then the column types can be
+    // pre-fetched when the processor is scheduled, rather than having to 
populate them on-the-fly.
+    protected volatile boolean isDynamicMaxValues = false;
+
     private static SimpleDateFormat TIME_TYPE_FORMAT = new 
SimpleDateFormat("HH:mm:ss.SSS");
 
     static {
@@ -166,13 +189,28 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                 .build();
     }
 
+    // A common validation procedure for DB fetch processors, it stores 
whether the Table Name and/or Max Value Column properties have expression 
language
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        // For backwards-compatibility, keep track of whether the table name 
and max-value column properties are dynamic (i.e. has expression language)
+        isDynamicTableName = 
validationContext.isExpressionLanguagePresent(validationContext.getProperty(TABLE_NAME).getValue());
+        isDynamicMaxValues = 
validationContext.isExpressionLanguagePresent(validationContext.getProperty(MAX_VALUE_COLUMN_NAMES).getValue());
+
+        return super.customValidate(validationContext);
+    }
+
     public void setup(final ProcessContext context) {
+        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+
+        // If there are no max-value column names specified, we don't need to 
perform this processing
+        if (StringUtils.isEmpty(maxValueColumnNames)) {
+            return;
+        }
+
         // Try to fill the columnTypeMap with the types of the desired 
max-value columns
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final String tableName = context.getProperty(TABLE_NAME).getValue();
-        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
-        final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
 
+        final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
         try (final Connection con = dbcpService.getConnection();
              final Statement st = con.createStatement()) {
 
@@ -187,10 +225,10 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                 columnTypeMap.clear();
                 for (int i = 1; i <= numCols; i++) {
                     String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
+                    String colKey = getStateKey(tableName, colName);
                     int colType = resultSetMetaData.getColumnType(i);
-                    columnTypeMap.put(colName, colType);
+                    columnTypeMap.putIfAbsent(colKey, colType);
                 }
-
             } else {
                 throw new ProcessException("No columns found in table from 
those specified: " + maxValueColumnNames);
             }
@@ -378,4 +416,16 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                 return value;
         }
     }
+
+    protected static String getStateKey(String prefix, String columnName) {
+        StringBuilder sb = new StringBuilder();
+        if (prefix != null) {
+            sb.append(prefix.toLowerCase());
+            sb.append(NAMESPACE_DELIMITER);
+        }
+        if (columnName != null) {
+            sb.append(columnName.toLowerCase());
+        }
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6d4901cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index bff1024..966c20d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -21,11 +21,15 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
@@ -49,6 +53,7 @@ import java.sql.Statement;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -60,18 +65,28 @@ import java.util.stream.IntStream;
 
 
 @TriggerSerially
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
-@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class})
+@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, 
ListDatabaseTables.class})
 @CapabilityDescription("Generates SQL select queries that fetch \"pages\" of 
rows from a table. The partition size property, along with the table's row 
count, "
         + "determine the size and number of pages and generated FlowFiles. In 
addition, incremental fetching can be achieved by setting Maximum-Value 
Columns, "
         + "which causes the processor to track the columns' maximum values, 
thus only fetching rows whose columns' values exceed the observed maximums. 
This "
-        + "processor is intended to be run on the Primary Node only.")
+        + "processor is intended to be run on the Primary Node only.\n\n"
+        + "This processor can accept incoming connections; the behavior of the 
processor is different whether incoming connections are provided:\n"
+        + "  - If no incoming connection(s) are specified, the processor will 
generate SQL queries on the specified processor schedule. Expression Language 
is supported for many "
+        + "fields, but no flow file attributes are available. However the 
properties will be evaluated using the Variable Registry.\n"
+        + "  - If incoming connection(s) are specified and no flow file is 
available to a processor task, no work will be performed.\n"
+        + "  - If incoming connection(s) are specified and a flow file is 
available to a processor task, the flow file's attributes may be used in 
Expression Language for such fields "
+        + "as Table Name and others. However, the Max-Value Columns and 
Columns to Return fields must be empty or refer to columns that are available 
in each specified table.")
 @Stateful(scopes = Scope.CLUSTER, description = "After performing a query on 
the specified table, the maximum values for "
         + "the specified column(s) will be retained for use in future 
executions of the query. This allows the Processor "
         + "to fetch only those records that have max values greater than the 
retained values. This can be used for "
         + "incremental fetching, fetching of newly added rows, etc. To clear 
the maximum values, clear the state of the processor "
         + "per the State Management documentation")
+@WritesAttributes({
+        @WritesAttribute(attribute = "generatetablefetch.sql.error", 
description = "If the processor has incoming connections, and processing an 
incoming flow file causes "
+        + "a SQL Exception, the flow file is routed to failure and this 
attribute is set to the exception message.")
+})
 public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
 
     public static final PropertyDescriptor PARTITION_SIZE = new 
PropertyDescriptor.Builder()
@@ -83,13 +98,20 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                     + "in the table.")
             .defaultValue("10000")
             .required(true)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("This relationship is only used when SQL query 
execution (using an incoming FlowFile) failed. The incoming FlowFile will be 
penalized and routed to this relationship. "
+                    + "If no incoming connection(s) are specified, this 
relationship is unused.")
+            .build();
+
     public GenerateTableFetch() {
         final Set<Relationship> r = new HashSet<>();
         r.add(REL_SUCCESS);
+        r.add(REL_FAILURE);
         relationships = Collections.unmodifiableSet(r);
 
         final List<PropertyDescriptor> pds = new ArrayList<>();
@@ -113,22 +135,41 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
         return propDescriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        return super.customValidate(validationContext);
+    }
+
     @OnScheduled
     public void setup(final ProcessContext context) {
-        super.setup(context);
+        // Pre-fetch the column types if using a static table name and 
max-value columns
+        if (!isDynamicTableName && !isDynamicMaxValues) {
+            super.setup(context);
+        }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
         ProcessSession session = sessionFactory.createSession();
+
+        FlowFile fileToProcess = null;
+        if (context.hasIncomingConnection()) {
+            fileToProcess = session.get();
+
+            if (fileToProcess == null) {
+                // Incoming connection with no flow file available, do no work 
(see capability description)
+                return;
+            }
+        }
+
         final ComponentLog logger = getLogger();
 
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
         final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
-        final String tableName = context.getProperty(TABLE_NAME).getValue();
-        final String columnNames = 
context.getProperty(COLUMN_NAMES).getValue();
-        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
-        final int partitionSize = 
context.getProperty(PARTITION_SIZE).asInteger();
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
+        final String columnNames = 
context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
+        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
+        final int partitionSize = 
context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
 
         final StateManager stateManager = context.getStateManager();
         final StateMap stateMap;
@@ -164,11 +205,20 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
             IntStream.range(0, maxValueColumnNameList.size()).forEach((index) 
-> {
                 String colName = maxValueColumnNameList.get(index);
                 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
-                String maxValue = statePropertyMap.get(colName.toLowerCase());
+                final String fullyQualifiedStateKey = getStateKey(tableName, 
colName);
+                String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
+                if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
+                    // If the table name is static and the fully-qualified key 
was not found, try just the column name
+                    maxValue = statePropertyMap.get(getStateKey(null, 
colName));
+                }
                 if (!StringUtils.isEmpty(maxValue)) {
-                    Integer type = columnTypeMap.get(colName.toLowerCase());
+                    Integer type = columnTypeMap.get(fullyQualifiedStateKey);
+                    if (type == null && !isDynamicTableName) {
+                        // If the table name is static and the fully-qualified 
key was not found, try just the column name
+                        type = columnTypeMap.get(getStateKey(null, colName));
+                    }
                     if (type == null) {
-                        // This shouldn't happen as we are populating 
columnTypeMap when the processor is scheduled.
+                        // This shouldn't happen as we are populating 
columnTypeMap when the processor is scheduled or when the first maximum is 
observed
                         throw new IllegalArgumentException("No column type 
found for: " + colName);
                     }
                     // Add a condition for the WHERE clause
@@ -186,7 +236,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
             try (final Connection con = dbcpService.getConnection();
                  final Statement st = con.createStatement()) {
 
-                final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+                final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
                 st.setQueryTimeout(queryTimeout); // timeout in seconds
 
                 logger.debug("Executing {}", new Object[]{selectQuery});
@@ -202,40 +252,61 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                     ResultSetMetaData rsmd = resultSet.getMetaData();
                     for (int i = 2; i <= rsmd.getColumnCount(); i++) {
                         String resultColumnName = 
rsmd.getColumnName(i).toLowerCase();
+                        String fullyQualifiedStateKey = getStateKey(tableName, 
resultColumnName);
+                        String resultColumnCurrentMax = 
statePropertyMap.get(fullyQualifiedStateKey);
+                        if (StringUtils.isEmpty(resultColumnCurrentMax) && 
!isDynamicTableName) {
+                            // If we can't find the value at the 
fully-qualified key name and the table name is static, it is possible (under a 
previous scheme)
+                            // the value has been stored under a key that is 
only the column name. Fall back to check the column name; either way, when a new
+                            // maximum value is observed, it will be stored 
under the fully-qualified key from then on.
+                            resultColumnCurrentMax = 
statePropertyMap.get(resultColumnName);
+                        }
+
                         int type = rsmd.getColumnType(i);
+                        if (isDynamicTableName) {
+                            // We haven't pre-populated the column type map if 
the table name is dynamic, so do it here
+                            columnTypeMap.put(fullyQualifiedStateKey, type);
+                        }
                         try {
-                            String newMaxValue = getMaxValueFromRow(resultSet, 
i, type, statePropertyMap.get(resultColumnName.toLowerCase()), 
dbAdapter.getName());
+                            String newMaxValue = getMaxValueFromRow(resultSet, 
i, type, resultColumnCurrentMax, dbAdapter.getName());
                             if (newMaxValue != null) {
-                                statePropertyMap.put(resultColumnName, 
newMaxValue);
+                                statePropertyMap.put(fullyQualifiedStateKey, 
newMaxValue);
                             }
                         } catch (ParseException | IOException pie) {
                             // Fail the whole thing here before we start 
creating flow files and such
                             throw new ProcessException(pie);
                         }
+
                     }
                 } else {
                     // Something is very wrong here, one row (even if count is 
zero) should be returned
                     throw new SQLException("No rows returned from metadata 
query: " + selectQuery);
                 }
-            } catch (SQLException e) {
-                logger.error("Unable to execute SQL select query {} due to 
{}", new Object[]{selectQuery, e});
-                throw new ProcessException(e);
-            }
-            final int numberOfFetches = (partitionSize == 0) ? rowCount : 
(rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
 
+                final int numberOfFetches = (partitionSize == 0) ? rowCount : 
(rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
 
-            // Generate SQL statements to read "pages" of data
-            for (int i = 0; i < numberOfFetches; i++) {
-                FlowFile sqlFlowFile;
+                // Generate SQL statements to read "pages" of data
+                for (int i = 0; i < numberOfFetches; i++) {
+                    Integer limit = partitionSize == 0 ? null : partitionSize;
+                    Integer offset = partitionSize == 0 ? null : i * 
partitionSize;
+                    final String query = 
dbAdapter.getSelectStatement(tableName, columnNames, whereClause, 
StringUtils.join(maxValueColumnNameList, ", "), limit, offset);
+                    FlowFile sqlFlowFile = (fileToProcess == null) ? 
session.create() : session.create(fileToProcess);
+                    sqlFlowFile = session.write(sqlFlowFile, out -> 
out.write(query.getBytes()));
+                    session.transfer(sqlFlowFile, REL_SUCCESS);
+                }
+
+                if (fileToProcess != null) {
+                    session.remove(fileToProcess);
+                }
+            } catch (SQLException e) {
+                if (fileToProcess != null) {
+                    logger.error("Unable to execute SQL select query {} due to 
{}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
+                    fileToProcess = session.putAttribute(fileToProcess, 
"generatetablefetch.sql.error", e.getMessage());
+                    session.transfer(fileToProcess, REL_FAILURE);
 
-                Integer limit = partitionSize == 0 ? null : partitionSize;
-                Integer offset = partitionSize == 0 ? null : i * partitionSize;
-                final String query = dbAdapter.getSelectStatement(tableName, 
columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), 
limit, offset);
-                sqlFlowFile = session.create();
-                sqlFlowFile = session.write(sqlFlowFile, out -> {
-                    out.write(query.getBytes());
-                });
-                session.transfer(sqlFlowFile, REL_SUCCESS);
+                } else {
+                    logger.error("Unable to execute SQL select query {} due to 
{}", new Object[]{selectQuery, e});
+                    throw new ProcessException(e);
+                }
             }
 
             session.commit();

http://git-wip-us.apache.org/repos/asf/nifi/blob/6d4901cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 2fddb87..1d898b4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -70,9 +71,13 @@ import java.util.stream.IntStream;
 @EventDriven
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"sql", "select", "jdbc", "query", "database"})
-@CapabilityDescription("Execute provided SQL select query. Query result will 
be converted to Avro format."
-        + " Streaming is used so arbitrarily large result sets are supported. 
This processor can be scheduled to run on "
-        + "a timer or cron expression, using the standard scheduling methods. 
FlowFile attribute "
+@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
+@CapabilityDescription("Generates and executes a SQL select query to fetch all 
rows whose values in the specified Maximum Value column(s) are larger than the "
+        + "previously-seen maxima. Query result will be converted to Avro 
format. Expression Language is supported for several properties, but no 
incoming "
+        + "connections are permitted. The Variable Registry may be used to 
provide values for any property containing Expression Language. If it is 
desired to "
+        + "leverage flow file attributes to perform these queries, the 
GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. "
+        + "Streaming is used so arbitrarily large result sets are supported. 
This processor can be scheduled to run on "
+        + "a timer or cron expression, using the standard scheduling methods. 
This processor is intended to be run on the Primary Node only. FlowFile 
attribute "
         + "'querydbtable.row.count' indicates how many rows were selected.")
 @Stateful(scopes = Scope.CLUSTER, description = "After performing a query on 
the specified table, the maximum values for "
         + "the specified column(s) will be retained for use in future 
executions of the query. This allows the Processor "
@@ -80,7 +85,7 @@ import java.util.stream.IntStream;
         + "incremental fetching, fetching of newly added rows, etc. To clear 
the maximum values, clear the state of the processor "
         + "per the State Management documentation")
 @WritesAttributes({
-        @WritesAttribute(attribute = "querydbtable.row.count"),
+        @WritesAttribute(attribute = "querydbtable.row.count", 
description="The number of rows selected by the query"),
         @WritesAttribute(attribute="fragment.identifier", description="If 'Max 
Rows Per Flow File' is set then all FlowFiles from the same query result set "
                 + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
         @WritesAttribute(attribute="fragment.count", description="If 'Max Rows 
Per Flow File' is set then this is the total number of  "
@@ -107,6 +112,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
             .defaultValue("0")
             .required(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
@@ -117,6 +123,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
             .defaultValue("0")
             .required(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor MAX_FRAGMENTS = new 
PropertyDescriptor.Builder()
@@ -127,6 +134,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
             .defaultValue("0")
             .required(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public QueryDatabaseTable() {
@@ -184,13 +192,13 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
 
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
         final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
-        final String tableName = context.getProperty(TABLE_NAME).getValue();
-        final String columnNames = 
context.getProperty(COLUMN_NAMES).getValue();
-        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
-        final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
-        final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger();
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+        final String columnNames = 
context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final Integer fetchSize = 
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
         final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
-                ? context.getProperty(MAX_FRAGMENTS).asInteger()
+                ? 
context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
                 : 0;
         final boolean convertNamesForAvro = 
context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
 
@@ -212,9 +220,21 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         final Map<String, String> statePropertyMap = new 
HashMap<>(stateMap.toMap());
 
         //If an initial max value for column(s) has been specified using 
properties, and this column is not in the state manager, sync them to the state 
property map
-        for(final Map.Entry<String,String> maxProp : 
maxValueProperties.entrySet()){
-            if (!statePropertyMap.containsKey(maxProp.getKey().toLowerCase())) 
{
-                statePropertyMap.put(maxProp.getKey().toLowerCase(), 
maxProp.getValue());
+        for (final Map.Entry<String, String> maxProp : 
maxValueProperties.entrySet()) {
+            String maxPropKey = maxProp.getKey().toLowerCase();
+            String fullyQualifiedMaxPropKey = getStateKey(tableName, 
maxPropKey);
+            if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
+                String newMaxPropValue;
+                // If we can't find the value at the fully-qualified key name, 
it is possible (under a previous scheme)
+                // the value has been stored under a key that is only the 
column name. Fall back to check the column name,
+                // but store the new initial max value under the 
fully-qualified key.
+                if (statePropertyMap.containsKey(maxPropKey)) {
+                    newMaxPropValue = statePropertyMap.get(maxPropKey);
+                } else {
+                    newMaxPropValue = maxProp.getValue();
+                }
+                statePropertyMap.put(fullyQualifiedMaxPropKey, 
newMaxPropValue);
+
             }
         }
 
@@ -247,7 +267,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
                 // Ignore and use default JDBC URL. This shouldn't happen 
unless the driver doesn't implement getMetaData() properly
             }
 
-            final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+            final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
             st.setQueryTimeout(queryTimeout); // timeout in seconds
             try {
                 logger.debug("Executing query {}", new Object[]{selectQuery});
@@ -304,7 +324,10 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
                 for (int i = 0; i < resultSetFlowFiles.size(); i++) {
                     // Add maximum values as attributes
                     for (Map.Entry<String, String> entry : 
statePropertyMap.entrySet()) {
-                        resultSetFlowFiles.set(i, 
session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), 
entry.getValue()));
+                        // Get just the column name from the key
+                        String key = entry.getKey();
+                        String colName = 
key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + 
NAMESPACE_DELIMITER.length());
+                        resultSetFlowFiles.set(i, 
session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, 
entry.getValue()));
                     }
 
                     //set count on all FlowFiles
@@ -349,9 +372,16 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
             List<String> whereClauses = new 
ArrayList<>(maxValColumnNames.size());
             IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
                 String colName = maxValColumnNames.get(index);
-                String maxValue = stateMap.get(colName.toLowerCase());
+                String maxValueKey = getStateKey(tableName, colName);
+                String maxValue = stateMap.get(maxValueKey);
+                if (StringUtils.isEmpty(maxValue)) {
+                    // If we can't find the value at the fully-qualified key 
name, it is possible (under a previous scheme)
+                    // the value has been stored under a key that is only the 
column name. Fall back to check the column name; either way, when a new
+                    // maximum value is observed, it will be stored under the 
fully-qualified key from then on.
+                    maxValue = stateMap.get(colName.toLowerCase());
+                }
                 if (!StringUtils.isEmpty(maxValue)) {
-                    Integer type = columnTypeMap.get(colName.toLowerCase());
+                    Integer type = columnTypeMap.get(maxValueKey);
                     if (type == null) {
                         // This shouldn't happen as we are populating 
columnTypeMap when the processor is scheduled.
                         throw new IllegalArgumentException("No column type 
found for: " + colName);
@@ -371,7 +401,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
 
 
     protected Map<String,String> getDefaultMaxValueProperties(final 
Map<PropertyDescriptor, String> properties){
-        final Map<String,String> defaultMaxValues = new HashMap<String, 
String>();
+        final Map<String,String> defaultMaxValues = new HashMap<>();
 
         for (final Map.Entry<PropertyDescriptor, String> entry : 
properties.entrySet()) {
             final String key = entry.getKey().getName();
@@ -407,15 +437,22 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
                 if (nrOfColumns > 0) {
                     for (int i = 1; i <= nrOfColumns; i++) {
                         String colName = meta.getColumnName(i).toLowerCase();
-                        Integer type = columnTypeMap.get(colName);
+                        String fullyQualifiedMaxValueKey = 
getStateKey(meta.getTableName(i), colName);
+                        Integer type = 
columnTypeMap.get(fullyQualifiedMaxValueKey);
                         // Skip any columns we're not keeping track of or 
whose value is null
                         if (type == null || resultSet.getObject(i) == null) {
                             continue;
                         }
-                        String maxValueString = newColMap.get(colName);
+                        String maxValueString = 
newColMap.get(fullyQualifiedMaxValueKey);
+                        // If we can't find the value at the fully-qualified 
key name, it is possible (under a previous scheme)
+                        // the value has been stored under a key that is only 
the column name. Fall back to check the column name; either way, when a new
+                        // maximum value is observed, it will be stored under 
the fully-qualified key from then on.
+                        if (StringUtils.isEmpty(maxValueString)) {
+                            maxValueString = newColMap.get(colName);
+                        }
                         String newMaxValueString = 
getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
                         if (newMaxValueString != null) {
-                            newColMap.put(colName, newMaxValueString);
+                            newColMap.put(fullyQualifiedMaxValueKey, 
newMaxValueString);
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6d4901cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 3353a87..92f4757 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -72,6 +72,8 @@ public class QueryDatabaseTableTest {
     private final static String DB_LOCATION = "target/db_qdt";
     private DatabaseAdapter dbAdapter;
     private HashMap<String, DatabaseAdapter> origDbAdapters;
+    private final static String TABLE_NAME_KEY = "tableName";
+    private final static String MAX_ROWS_KEY = "maxRows";
 
 
     @BeforeClass
@@ -142,13 +144,13 @@ public class QueryDatabaseTableTest {
         maxValues.put("id", "509");
         StateManager stateManager = runner.getStateManager();
         stateManager.setState(maxValues, Scope.CLUSTER);
-        processor.putColumnType("id", Types.INTEGER);
+        processor.putColumnType("mytable" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
         query = processor.getQuery(dbAdapter, "myTable", null, 
Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap());
         assertEquals("SELECT * FROM myTable WHERE id > 509", query);
 
         maxValues.put("date_created", "2016-03-07 12:34:56");
         stateManager.setState(maxValues, Scope.CLUSTER);
-        processor.putColumnType("date_created", Types.TIMESTAMP);
+        processor.putColumnType("mytable" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", 
Types.TIMESTAMP);
         query = processor.getQuery(dbAdapter, "myTable", null, 
Arrays.asList("id", "DATE_CREATED"), 
stateManager.getState(Scope.CLUSTER).toMap());
         assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= 
'2016-03-07 12:34:56'", query);
 
@@ -460,6 +462,7 @@ public class QueryDatabaseTableTest {
 
         runner.setIncomingConnection(false);
         runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
+        
runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
 
         QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new 
GenericDatabaseAdapter() {
             @Override
@@ -521,7 +524,8 @@ public class QueryDatabaseTableTest {
         runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
         runner.setIncomingConnection(false);
         runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
-        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, 
"9");//Using a non-round number to make sure the last file is ragged
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "${" + 
MAX_ROWS_KEY + "}");
+        runner.setVariable(MAX_ROWS_KEY, "9");
 
         runner.run();
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
12);
@@ -675,7 +679,8 @@ public class QueryDatabaseTableTest {
             cal.add(Calendar.MINUTE, 1);
         }
 
-        runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + 
TABLE_NAME_KEY + "}");
+        runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
         runner.setIncomingConnection(false);
         runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, 
"created_on");
 
@@ -687,14 +692,14 @@ public class QueryDatabaseTableTest {
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
         in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
         assertEquals(4, getNumberOfRecordsFromStream(in));
-        runner.getStateManager().assertStateEquals("created_on", "1970-01-01 
00:09:00.0", Scope.CLUSTER);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 
00:09:00.0", Scope.CLUSTER);
         runner.clearTransferState();
 
         // Run again, this time no flowfiles/rows should be transferred
         // Validate Max Value doesn't change also
         runner.run();
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
-        runner.getStateManager().assertStateEquals("created_on", "1970-01-01 
00:09:00.0", Scope.CLUSTER);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 
00:09:00.0", Scope.CLUSTER);
         runner.clearTransferState();
 
         // Append a new row, expect 1 flowfile one row
@@ -707,7 +712,7 @@ public class QueryDatabaseTableTest {
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
         in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
         assertEquals(1, getNumberOfRecordsFromStream(in));
-        runner.getStateManager().assertStateEquals("created_on", "1970-01-01 
00:10:00.0", Scope.CLUSTER);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 
00:10:00.0", Scope.CLUSTER);
         runner.clearTransferState();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6d4901cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 8a8aa01..f79f96c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
 
 
 import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -95,7 +96,7 @@ public class TestGenerateTableFetch {
         final DBCPService dbcp = new DBCPServiceSimpleImpl();
         final Map<String, String> dbcpProperties = new HashMap<>();
 
-        runner = TestRunners.newTestRunner(GenerateTableFetch.class);
+        runner = TestRunners.newTestRunner(processor);
         runner.addControllerService("dbcp", dbcp, dbcpProperties);
         runner.enableControllerService(dbcp);
         runner.setProperty(GenerateTableFetch.DBCP_SERVICE, "dbcp");
@@ -251,9 +252,271 @@ public class TestGenerateTableFetch {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testMultiplePartitionsIncomingFlowFiles() throws 
ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE1");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE1 (id integer not null, 
bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE1 (id, bucket) VALUES (0, 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE1 (id, bucket) VALUES (1, 
0)");
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE2");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, 
bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, bucket) VALUES (0, 
0)");
+
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
+        runner.setIncomingConnection(true);
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "${partSize}");
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE1");
+            put("partSize", "1");
+        }});
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE2");
+            put("partSize", "2");
+        }});
+
+        // The table does not exist, expect the original flow file to be 
routed to failure
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE3");
+            put("partSize", "1");
+        }});
+
+        runner.run(3);
+        runner.assertTransferCount(AbstractDatabaseFetchProcessor.REL_SUCCESS, 
3);
+
+        // Two records from table 1
+        
assertEquals(runner.getFlowFilesForRelationship(AbstractDatabaseFetchProcessor.REL_SUCCESS).stream().filter(
+                (ff) -> 
"TEST_QUERY_DB_TABLE1".equals(ff.getAttribute("tableName"))).count(),
+                2);
+
+        // One record from table 2
+        
assertEquals(runner.getFlowFilesForRelationship(AbstractDatabaseFetchProcessor.REL_SUCCESS).stream().filter(
+                (ff) -> 
"TEST_QUERY_DB_TABLE2".equals(ff.getAttribute("tableName"))).count(),
+                1);
+
+        // Table 3 doesn't exist, should be routed to failure
+        runner.assertTransferCount(GenerateTableFetch.REL_FAILURE, 1);
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE1");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE2");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+    }
+
+    @Test
+    public void 
testBackwardsCompatibilityStateKeyStaticTableDynamicMaxValues() throws 
Exception {
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 
0)");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(true);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, 
"${maxValueCol}");
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("maxValueCol", "id");
+        }});
+
+        // Pre-populate the state with a key for column name (not 
fully-qualified)
+        StateManager stateManager = runner.getStateManager();
+        stateManager.setState(new HashMap<String, String>() {{
+            put("id", "0");
+        }}, Scope.CLUSTER);
+
+        // Pre-populate the column type map with an entry for id (not 
fully-qualified)
+        processor.columnTypeMap.put("id", 4);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 0 ORDER BY 
id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+    }
+
+    @Test
+    public void 
testBackwardsCompatibilityStateKeyDynamicTableDynamicMaxValues() throws 
Exception {
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 
0)");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
+        runner.setIncomingConnection(true);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, 
"${maxValueCol}");
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE");
+            put("maxValueCol", "id");
+        }});
+
+        // Pre-populate the state with a key for column name (not 
fully-qualified)
+        StateManager stateManager = runner.getStateManager();
+        stateManager.setState(new HashMap<String, String>() {{
+            put("id", "0");
+        }}, Scope.CLUSTER);
+
+        // Pre-populate the column type map with an entry for id (not 
fully-qualified)
+        processor.columnTypeMap.put("id", 4);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        // Note there is no WHERE clause here. Because we are using dynamic 
tables, the old state key/value is not retrieved
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 
10000 ROWS ONLY", new String(flowFile.toByteArray()));
+
+        runner.clearTransferState();
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 
0)");
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE");
+            put("maxValueCol", "id");
+        }});
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY 
id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+    }
+
+    @Test
+    public void 
testBackwardsCompatibilityStateKeyDynamicTableStaticMaxValues() throws 
Exception {
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 
0)");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
+        runner.setIncomingConnection(true);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "id");
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE");
+        }});
+
+        // Pre-populate the state with a key for column name (not 
fully-qualified)
+        StateManager stateManager = runner.getStateManager();
+        stateManager.setState(new HashMap<String, String>() {{
+            put("id", "0");
+        }}, Scope.CLUSTER);
+
+        // Pre-populate the column type map with an entry for id (not 
fully-qualified)
+        processor.columnTypeMap.put("id", 4);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        // Note there is no WHERE clause here. Because we are using dynamic 
tables, the old state key/value is not retrieved
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 
10000 ROWS ONLY", new String(flowFile.toByteArray()));
+
+        runner.clearTransferState();
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 
0)");
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE");
+            put("maxValueCol", "id");
+        }});
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY 
id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+    }
+
+    @Test
+    public void testBackwardsCompatibilityStateKeyVariableRegistry() throws 
Exception {
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 
0)");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, 
"${maxValueCol}");
+
+        runner.setVariable("tableName", "TEST_QUERY_DB_TABLE");
+        runner.setVariable("maxValueCol", "id");
+
+        // Pre-populate the state with a key for column name (not 
fully-qualified)
+        StateManager stateManager = runner.getStateManager();
+        stateManager.setState(new HashMap<String, String>() {{
+            put("id", "0");
+        }}, Scope.CLUSTER);
+
+        // Pre-populate the column type map with an entry for id (not 
fully-qualified)
+        processor.columnTypeMap.put("id", 4);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        // Note there is no WHERE clause here. Because we are using dynamic 
tables (i.e. Expression Language,
+        // even when not referring to flow file attributes), the old state 
key/value is not retrieved
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 
10000 ROWS ONLY", new String(flowFile.toByteArray()));
+    }
+
 
     /**
-     * Simple implementation only for ListDatabaseTables processor testing.
+     * Simple implementation only for GenerateTableFetch processor testing.
      */
     private class DBCPServiceSimpleImpl extends AbstractControllerService 
implements DBCPService {
 

Reply via email to