Repository: nifi
Updated Branches:
  refs/heads/master 148b4497b -> 98395de74


NIFI-1575: Add QueryDatabaseTable processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/98395de7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/98395de7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/98395de7

Branch: refs/heads/master
Commit: 98395de74ffb0d5cd6c7b8842b501f81b2d3ed92
Parents: 148b449
Author: Matt Burgess <[email protected]>
Authored: Tue Mar 15 22:55:34 2016 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Mar 16 15:55:57 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java | 601 +++++++++++++++++++
 .../processors/standard/util/JdbcCommon.java    |  45 +-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/QueryDatabaseTableTest.java        | 356 +++++++++++
 4 files changed, 998 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/98395de7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
new file mode 100644
index 0000000..08f6b41
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.LongHolder;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.text.DecimalFormat;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.sql.Types.ARRAY;
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.BLOB;
+import static java.sql.Types.BOOLEAN;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.DATE;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.FLOAT;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NUMERIC;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.REAL;
+import static java.sql.Types.ROWID;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+
+@EventDriven
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "select", "jdbc", "query", "database"})
+@CapabilityDescription("Execute provided SQL select query. Query result will 
be converted to Avro format."
+        + " Streaming is used so arbitrarily large result sets are supported. 
This processor can be scheduled to run on "
+        + "a timer, or cron expression, using the standard scheduling methods, 
or it can be triggered by an incoming FlowFile. "
+        + "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the "
+        + "select query. FlowFile attribute 'querydbtable.row.count' indicates 
how many rows were selected.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on 
the specified table, the maximum values for "
+        + "the specified column(s) will be retained for use in future 
executions of the query. This allows the Processor "
+        + "to fetch only those records that have max values greater than the 
retained values. This can be used for "
+        + "incremental fetching, fetching of newly added rows, etc. To clear 
the maximum values, clear the state of the processor "
+        + "per the State Management documentation")
+@WritesAttribute(attribute = "querydbtable.row.count")
+public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
+
+    public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
+
+    public static final String SQL_PREPROCESS_STRATEGY_NONE = "None";
+    public static final String SQL_PREPROCESS_STRATEGY_ORACLE = "Oracle";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully created FlowFile from SQL query result 
set.")
+            .build();
+
+    private final Set<Relationship> relationships;
+
+    public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain a 
connection to the database.")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the database table to be queried.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor COLUMN_NAMES = new 
PropertyDescriptor.Builder()
+            .name("Columns to Return")
+            .description("A comma-separated list of column names to be used in 
the query. If your database requires "
+                    + "special treatment of the names (quoting, e.g.), each 
name should include such treatment. If no "
+                    + "column names are supplied, all columns in the specified 
table will be returned.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new 
PropertyDescriptor.Builder()
+            .name("Maximum-value Columns")
+            .description("A comma-separated list of column names. The 
processor will keep track of the maximum value "
+                    + "for each column that has been returned since the 
processor started running. This can be used to "
+                    + "retrieve only those rows that have been added/updated 
since the last retrieval. Note that some "
+                    + "JDBC types such as bit/boolean are not conducive to 
maintaining maximum value, so columns of these "
+                    + "types should not be listed in this property, and will 
result in error(s) during processing.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Max Wait Time")
+            .description("The maximum amount of time allowed for a running SQL 
select query "
+                    + ", zero means there is no limit. Max time less than 1 
second will be equal to zero.")
+            .defaultValue("0 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SQL_PREPROCESS_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("SQL Pre-processing Strategy")
+            .description("The strategy to employ when generating the SQL for 
querying the table. A strategy may include "
+                    + "custom or database-specific code, such as the treatment 
of time/date formats.")
+            .required(true)
+            .allowableValues(SQL_PREPROCESS_STRATEGY_NONE, 
SQL_PREPROCESS_STRATEGY_ORACLE)
+            .defaultValue("None")
+            .build();
+
+
+    private final List<PropertyDescriptor> propDescriptors;
+
+    protected final Map<String, Integer> columnTypeMap = new HashMap<>();
+
+    public QueryDatabaseTable() {
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(r);
+
+        final List<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(DBCP_SERVICE);
+        pds.add(TABLE_NAME);
+        pds.add(COLUMN_NAMES);
+        pds.add(MAX_VALUE_COLUMN_NAMES);
+        pds.add(QUERY_TIMEOUT);
+        pds.add(SQL_PREPROCESS_STRATEGY);
+        propDescriptors = Collections.unmodifiableList(pds);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propDescriptors;
+    }
+
+    @OnScheduled
+    public void setup(final ProcessContext context) {
+        // Try to fill the columnTypeMap with the types of the desired 
max-value columns
+        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final String tableName = context.getProperty(TABLE_NAME).getValue();
+        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
+
+        try (final Connection con = dbcpService.getConnection();
+             final Statement st = con.createStatement()) {
+
+            // Try a query that returns no rows, for the purposes of getting 
metadata about the columns. It is possible
+            // to use DatabaseMetaData.getColumns(), but not all drivers 
support this, notably the schema-on-read
+            // approach as in Apache Drill
+            String query = getSelectFromClause(tableName, 
maxValueColumnNames).append(" WHERE 1 = 0").toString();
+            ResultSet resultSet = st.executeQuery(query);
+            ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+            int numCols = resultSetMetaData.getColumnCount();
+            if (numCols > 0) {
+                columnTypeMap.clear();
+                for (int i = 1; i <= numCols; i++) {
+                    String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
+                    int colType = resultSetMetaData.getColumnType(i);
+                    columnTypeMap.put(colName, colType);
+                }
+
+            } else {
+                throw new ProcessException("No columns found in table from 
those specified: " + maxValueColumnNames);
+            }
+
+        } catch (SQLException e) {
+            throw new ProcessException("Unable to communicate with database in 
order to determine column types", e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        ProcessSession session = sessionFactory.createSession();
+        FlowFile fileToProcess = null;
+
+        final ProcessorLog logger = getLogger();
+
+        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final String tableName = context.getProperty(TABLE_NAME).getValue();
+        final String columnNames = 
context.getProperty(COLUMN_NAMES).getValue();
+        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
+        final String preProcessStrategy = 
context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to retrieve observed maximum values from 
the State Manager. Will not perform "
+                    + "query until this is accomplished.", ioe);
+            context.yield();
+            return;
+        }
+        // Make a mutable copy of the current state property map. This will be 
updated by the result row callback, and eventually
+        // set as the current state map (after the session has been committed)
+        final Map<String, String> statePropertyMap = new 
HashMap<>(stateMap.toMap());
+
+        final String selectQuery = getQuery(tableName, columnNames, 
getColumns(maxValueColumnNames), stateMap, preProcessStrategy);
+        final StopWatch stopWatch = new StopWatch(true);
+
+        try (final Connection con = dbcpService.getConnection();
+             final Statement st = con.createStatement()) {
+
+            final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+            st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+            final LongHolder nrOfRows = new LongHolder(0L);
+
+            fileToProcess = session.create();
+            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                    try {
+                        logger.debug("Executing query {}", new 
Object[]{selectQuery});
+                        final ResultSet resultSet = 
st.executeQuery(selectQuery);
+                        // Max values will be updated in the state property 
map by the callback
+                        final MaxValueResultSetRowCollector maxValCollector = 
new MaxValueResultSetRowCollector(statePropertyMap, preProcessStrategy);
+                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, 
out, tableName, maxValCollector));
+
+                    } catch (final SQLException e) {
+                        throw new ProcessException("Error during database 
query or conversion of records to Avro", e);
+                    }
+                }
+            });
+
+            if (nrOfRows.get() > 0) {
+                // set attribute how many rows were selected
+                fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, nrOfRows.get().toString());
+
+                logger.info("{} contains {} Avro records; transferring to 
'success'",
+                        new Object[]{fileToProcess, nrOfRows.get()});
+                String jdbcURL = "DBCPService";
+                try {
+                    DatabaseMetaData databaseMetaData = con.getMetaData();
+                    if (databaseMetaData != null) {
+                        jdbcURL = databaseMetaData.getURL();
+                    }
+                } catch (SQLException se) {
+                    // Ignore and use default JDBC URL. This shouldn't happen 
unless the driver doesn't implement getMetaData() properly
+                }
+                session.getProvenanceReporter().receive(fileToProcess, 
jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                session.transfer(fileToProcess, REL_SUCCESS);
+            } else {
+                // If there were no rows returned, don't send the flowfile
+                session.remove(fileToProcess);
+                context.yield();
+            }
+
+        } catch (final ProcessException | SQLException e) {
+            logger.error("Unable to execute SQL select query {} due to {}", 
new Object[]{selectQuery, e});
+            if (fileToProcess != null) {
+                session.remove(fileToProcess);
+            }
+            context.yield();
+        } finally {
+            session.commit();
+            try {
+                // Update the state
+                stateManager.setState(statePropertyMap, Scope.CLUSTER);
+            } catch (IOException ioe) {
+                getLogger().error("{} failed to update State Manager, maximum 
observed values will not be recorded", new Object[]{this, ioe});
+            }
+        }
+    }
+
+    protected List<String> getColumns(String commaSeparatedColumnList) {
+        if (StringUtils.isEmpty(commaSeparatedColumnList)) {
+            return Collections.emptyList();
+        }
+        final String[] columns = commaSeparatedColumnList.split(",");
+        final List<String> columnList = new ArrayList<>(columns.length);
+        for (String column : columns) {
+            if (column != null) {
+                String trimmedColumn = column.trim();
+                if (!StringUtils.isEmpty(trimmedColumn)) {
+                    columnList.add(trimmedColumn);
+                }
+            }
+        }
+        return columnList;
+    }
+
+    protected String getQuery(String tableName, String columnNames, 
List<String> maxValColumnNames,
+                              StateMap stateMap, String preProcessStrategy) {
+        if (StringUtils.isEmpty(tableName)) {
+            throw new IllegalArgumentException("Table name must be specified");
+        }
+        final StringBuilder query = new 
StringBuilder(getSelectFromClause(tableName, columnNames));
+
+        // Check state map for last max values
+        if (stateMap != null && stateMap.getVersion() != -1 && 
maxValColumnNames != null) {
+            Map<String, String> stateProperties = stateMap.toMap();
+            List<String> whereClauses = new 
ArrayList<>(maxValColumnNames.size());
+            for (String colName : maxValColumnNames) {
+                String maxValue = stateProperties.get(colName.toLowerCase());
+                if (!StringUtils.isEmpty(maxValue)) {
+                    Integer type = columnTypeMap.get(colName.toLowerCase());
+                    if (type == null) {
+                        // This shouldn't happen as we are populating 
columnTypeMap when the processor is scheduled.
+                        throw new IllegalArgumentException("No column type 
found for: " + colName);
+                    }
+                    // Add a condition for the WHERE clause
+                    whereClauses.add(colName + " > " + getLiteralByType(type, 
maxValue, preProcessStrategy));
+                }
+            }
+            if (!whereClauses.isEmpty()) {
+                query.append(" WHERE ");
+                query.append(StringUtils.join(whereClauses, " AND "));
+            }
+        }
+
+        return query.toString();
+    }
+
+    /**
+     * Returns a basic SELECT ... FROM clause with the given column names and 
table name. If no column names are found,
+     * the wildcard (*) is used to select all columns.
+     *
+     * @param tableName   The name of the table to select from
+     * @param columnNames A comma-separated list of column names to select 
from the table
+     * @return A SQL select statement representing a query of the given column 
names from the given table
+     */
+    protected StringBuilder getSelectFromClause(String tableName, String 
columnNames) {
+        final StringBuilder query = new StringBuilder("SELECT ");
+        if (StringUtils.isEmpty(columnNames) || 
columnNames.trim().equals("*")) {
+            query.append("*");
+        } else {
+            query.append(columnNames);
+        }
+        query.append(" FROM ");
+        query.append(tableName);
+        return query;
+    }
+
+    /**
+     * Returns a SQL literal for the given value based on its type. For 
example, values of character type need to be enclosed
+     * in single quotes, whereas values of numeric type should not be.
+     *
+     * @param type  The JDBC type for the desired literal
+     * @param value The value to be converted to a SQL literal
+     * @return A String representing the given value as a literal of the given 
type
+     */
+    protected String getLiteralByType(int type, String value, String 
preProcessStrategy) {
+        // Format value based on column type. For example, strings and 
timestamps need to be quoted
+        switch (type) {
+            // For string-represented values, put in single quotes
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case ROWID:
+            case DATE:
+            case TIME:
+                return "'" + value + "'";
+            case TIMESTAMP:
+                // Timestamp literals in Oracle need to be cast with TO_DATE
+                if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) 
{
+                    return "to_date('" + value + "', 'yyyy-mm-dd HH24:MI:SS')";
+                } else {
+                    return "'" + value + "'";
+                }
+                // Else leave as is (numeric types, e.g.)
+            default:
+                return value;
+        }
+    }
+
+    protected class MaxValueResultSetRowCollector implements 
JdbcCommon.ResultSetRowCallback {
+        String preProcessStrategy;
+        Map<String, String> newColMap;
+
+        public MaxValueResultSetRowCollector(Map<String, String> stateMap, 
String preProcessStrategy) {
+            this.preProcessStrategy = preProcessStrategy;
+            newColMap = stateMap;
+        }
+
+        @Override
+        public void processRow(ResultSet resultSet) throws IOException {
+            if (resultSet == null) {
+                return;
+            }
+            try {
+                // Iterate over the row, check-and-set max values
+                final ResultSetMetaData meta = resultSet.getMetaData();
+                final int nrOfColumns = meta.getColumnCount();
+                if (nrOfColumns > 0) {
+                    for (int i = 1; i <= nrOfColumns; i++) {
+                        String colName = meta.getColumnName(i).toLowerCase();
+                        Integer type = columnTypeMap.get(colName);
+                        // Skip any columns we're not keeping track of or 
whose value is null
+                        if (type == null || resultSet.getObject(i) == null) {
+                            continue;
+                        }
+                        String maxValueString = newColMap.get(colName);
+                        switch (type) {
+                            case CHAR:
+                            case LONGNVARCHAR:
+                            case LONGVARCHAR:
+                            case NCHAR:
+                            case NVARCHAR:
+                            case VARCHAR:
+                            case ROWID:
+                                String colStringValue = resultSet.getString(i);
+                                if (maxValueString == null || 
colStringValue.compareTo(maxValueString) > 0) {
+                                    newColMap.put(colName, colStringValue);
+                                }
+                                break;
+
+                            case INTEGER:
+                            case SMALLINT:
+                            case TINYINT:
+                                Integer colIntValue = resultSet.getInt(i);
+                                Integer maxIntValue = null;
+                                if (maxValueString != null) {
+                                    maxIntValue = 
Integer.valueOf(maxValueString);
+                                }
+                                if (maxIntValue == null || colIntValue > 
maxIntValue) {
+                                    newColMap.put(colName, 
colIntValue.toString());
+                                }
+                                break;
+
+                            case BIGINT:
+                                Long colLongValue = resultSet.getLong(i);
+                                Long maxLongValue = null;
+                                if (maxValueString != null) {
+                                    maxLongValue = 
Long.valueOf(maxValueString);
+                                }
+                                if (maxLongValue == null || colLongValue > 
maxLongValue) {
+                                    newColMap.put(colName, 
colLongValue.toString());
+                                }
+                                break;
+
+                            case FLOAT:
+                            case REAL:
+                            case DOUBLE:
+                                Double colDoubleValue = resultSet.getDouble(i);
+                                Double maxDoubleValue = null;
+                                if (maxValueString != null) {
+                                    maxDoubleValue = 
Double.valueOf(maxValueString);
+                                }
+                                if (maxDoubleValue == null || colDoubleValue > 
maxDoubleValue) {
+                                    newColMap.put(colName, 
colDoubleValue.toString());
+                                }
+                                break;
+
+                            case DECIMAL:
+                            case NUMERIC:
+                                BigDecimal colBigDecimalValue = 
resultSet.getBigDecimal(i);
+                                BigDecimal maxBigDecimalValue = null;
+                                if (maxValueString != null) {
+                                    DecimalFormat df = new DecimalFormat();
+                                    df.setParseBigDecimal(true);
+                                    maxBigDecimalValue = (BigDecimal) 
df.parse(maxValueString);
+                                }
+                                if (maxBigDecimalValue == null || 
colBigDecimalValue.compareTo(maxBigDecimalValue) > 0) {
+                                    newColMap.put(colName, 
colBigDecimalValue.toString());
+                                }
+                                break;
+
+                            case DATE:
+                                Date rawColDateValue = resultSet.getDate(i);
+                                java.sql.Date colDateValue = new 
java.sql.Date(rawColDateValue.getTime());
+                                java.sql.Date maxDateValue = null;
+                                if (maxValueString != null) {
+                                    maxDateValue = 
java.sql.Date.valueOf(maxValueString);
+                                }
+                                if (maxDateValue == null || 
colDateValue.after(maxDateValue)) {
+                                    newColMap.put(colName, 
colDateValue.toString());
+                                }
+                                break;
+
+                            case TIME:
+                                Date rawColTimeValue = resultSet.getDate(i);
+                                java.sql.Time colTimeValue = new 
java.sql.Time(rawColTimeValue.getTime());
+                                java.sql.Time maxTimeValue = null;
+                                if (maxValueString != null) {
+                                    maxTimeValue = 
java.sql.Time.valueOf(maxValueString);
+                                }
+                                if (maxTimeValue == null || 
colTimeValue.after(maxTimeValue)) {
+                                    newColMap.put(colName, 
colTimeValue.toString());
+                                }
+                                break;
+
+                            case TIMESTAMP:
+                                // Oracle timestamp queries must use literals 
in java.sql.Date format
+                                if 
(SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) {
+                                    Date rawColOracleTimestampValue = 
resultSet.getDate(i);
+                                    java.sql.Date oracleTimestampValue = new 
java.sql.Date(rawColOracleTimestampValue.getTime());
+                                    java.sql.Date maxOracleTimestampValue = 
null;
+                                    if (maxValueString != null) {
+                                        maxOracleTimestampValue = 
java.sql.Date.valueOf(maxValueString);
+                                    }
+                                    if (maxOracleTimestampValue == null || 
oracleTimestampValue.after(maxOracleTimestampValue)) {
+                                        newColMap.put(colName, 
oracleTimestampValue.toString());
+                                    }
+                                } else {
+                                    Timestamp rawColTimestampValue = 
resultSet.getTimestamp(i);
+                                    java.sql.Timestamp colTimestampValue = new 
java.sql.Timestamp(rawColTimestampValue.getTime());
+                                    java.sql.Timestamp maxTimestampValue = 
null;
+                                    if (maxValueString != null) {
+                                        maxTimestampValue = 
java.sql.Timestamp.valueOf(maxValueString);
+                                    }
+                                    if (maxTimestampValue == null || 
colTimestampValue.after(maxTimestampValue)) {
+                                        newColMap.put(colName, 
colTimestampValue.toString());
+                                    }
+                                }
+                                break;
+
+                            case BIT:
+                            case BOOLEAN:
+                            case BINARY:
+                            case VARBINARY:
+                            case LONGVARBINARY:
+                            case ARRAY:
+                            case BLOB:
+                            case CLOB:
+                            default:
+                                throw new IOException("Type " + 
meta.getColumnTypeName(i) + " is not valid for maintaining maximum value");
+                        }
+                    }
+                }
+            } catch (ParseException | SQLException e) {
+                throw new IOException(e);
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/98395de7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 1247919..3b74c19 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -69,17 +69,30 @@ import org.apache.commons.lang3.StringUtils;
 public class JdbcCommon {
 
     public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream) throws SQLException, IOException {
-        final Schema schema = createSchema(rs);
+        return convertToAvroStream(rs, outStream, null, null);
+    }
+
+    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, String recordName)
+            throws SQLException, IOException {
+        return convertToAvroStream(rs, outStream, recordName, null);
+    }
+
+    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, String recordName, ResultSetRowCallback callback)
+            throws SQLException, IOException {
+        final Schema schema = createSchema(rs, recordName);
         final GenericRecord rec = new GenericData.Record(schema);
 
-        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<GenericRecord>(schema);
-        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<GenericRecord>(datumWriter)) {
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
             dataFileWriter.create(schema, outStream);
 
             final ResultSetMetaData meta = rs.getMetaData();
             final int nrOfColumns = meta.getColumnCount();
             long nrOfRows = 0;
             while (rs.next()) {
+                if (callback != null) {
+                    callback.processRow(rs);
+                }
                 for (int i = 1; i <= nrOfColumns; i++) {
                     final int javaSqlType = meta.getColumnType(i);
                     final Object value = rs.getObject(i);
@@ -125,10 +138,23 @@ public class JdbcCommon {
     }
 
     public static Schema createSchema(final ResultSet rs) throws SQLException {
+        return createSchema(rs, null);
+    }
+
+    /**
+     * Creates an Avro schema from a result set. If the table/record name is 
known a priori and provided, use that as a
+     * fallback for the record name if it cannot be retrieved from the result 
set, and finally fall back to a default value.
+     *
+     * @param rs         The result set to convert to Avro
+     * @param recordName The a priori record name to use if it cannot be 
determined from the result set.
+     * @return A Schema object representing the result set converted to an 
Avro record
+     * @throws SQLException if any error occurs during conversion
+     */
+    public static Schema createSchema(final ResultSet rs, String recordName) 
throws SQLException {
         final ResultSetMetaData meta = rs.getMetaData();
         final int nrOfColumns = meta.getColumnCount();
-        String tableName = "NiFi_ExecuteSQL_Record";
-        if(nrOfColumns > 0) {
+        String tableName = StringUtils.isEmpty(recordName) ? 
"NiFi_ExecuteSQL_Record" : recordName;
+        if (nrOfColumns > 0) {
             String tableNameFromMeta = meta.getTableName(1);
             if (!StringUtils.isBlank(tableNameFromMeta)) {
                 tableName = tableNameFromMeta;
@@ -218,4 +244,13 @@ public class JdbcCommon {
 
         return builder.endRecord();
     }
+
+    /**
+     * An interface for callback methods which allows processing of a row 
during the convertToAvroStream() processing.
+     * <b>IMPORTANT:</b> This method should only work on the row pointed at by 
the current ResultSet reference.
+     * Advancing the cursor (e.g.) can cause rows to be skipped during Avro 
transformation.
+     */
+    public interface ResultSetRowCallback {
+        void processRow(ResultSet resultSet) throws IOException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/98395de7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d3d765e..6c52d28 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -65,6 +65,7 @@ org.apache.nifi.processors.standard.PutJMS
 org.apache.nifi.processors.standard.PutSFTP
 org.apache.nifi.processors.standard.PutSQL
 org.apache.nifi.processors.standard.PutSyslog
+org.apache.nifi.processors.standard.QueryDatabaseTable
 org.apache.nifi.processors.standard.ReplaceText
 org.apache.nifi.processors.standard.RouteText
 org.apache.nifi.processors.standard.ReplaceTextWithMapping

http://git-wip-us.apache.org/repos/asf/nifi/blob/98395de7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
new file mode 100644
index 0000000..d16b9c6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the QueryDatabaseTable processor
+ */
+public class QueryDatabaseTableTest {
+
+    MockQueryDatabaseTable processor;
+    private TestRunner runner;
+    final static String DB_LOCATION = "target/db";
+
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    @Before
+    public void setup() throws InitializationException, IOException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        processor = new MockQueryDatabaseTable();
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp");
+        runner.getStateManager().clear(Scope.CLUSTER);
+    }
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testGetColumns() throws Exception {
+        assertTrue(processor.getColumns(null).isEmpty());
+        assertTrue(processor.getColumns("").isEmpty());
+        assertEquals(2, processor.getColumns("col1,col2").size());
+    }
+
+    @Test
+    public void testGetQuery() throws Exception {
+        String query = processor.getQuery("myTable", null, null, null, "None");
+        assertEquals("SELECT * FROM myTable", query);
+        query = processor.getQuery("myTable", "col1,col2", null, null, "None");
+        assertEquals("SELECT col1,col2 FROM myTable", query);
+
+        query = processor.getQuery("myTable", null, 
Collections.singletonList("id"), null, "None");
+        assertEquals("SELECT * FROM myTable", query);
+
+        Map<String, String> maxValues = new HashMap<>();
+        maxValues.put("id", "509");
+        StateManager stateManager = runner.getStateManager();
+        stateManager.setState(maxValues, Scope.CLUSTER);
+        processor.putColumnType("id", Types.INTEGER);
+        query = processor.getQuery("myTable", null, 
Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER), "None");
+        assertEquals("SELECT * FROM myTable WHERE id > 509", query);
+
+        maxValues.put("date_created", "2016-03-07 12:34:56");
+        stateManager.setState(maxValues, Scope.CLUSTER);
+        processor.putColumnType("date_created", Types.TIMESTAMP);
+        query = processor.getQuery("myTable", null, Arrays.asList("id", 
"DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "None");
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > 
'2016-03-07 12:34:56'", query);
+        // Test Oracle strategy
+        query = processor.getQuery("myTable", null, Arrays.asList("id", 
"DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "Oracle");
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > 
to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetQueryNoTable() throws Exception {
+        processor.getQuery(null, null, null, null, "None");
+    }
+
+    @Test
+    public void testAddedRows() throws ClassNotFoundException, SQLException, 
InitializationException, IOException {
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+
+        InputStream in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(3, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flowfile with one new 
row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+
+        // Sanity check - run again, this time no flowfiles/rows should be 
transferred
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        // Add timestamp as a max value column name
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, 
created_on");
+
+        // Add a new row with a higher ID and run, one flow file will be 
transferred because no max value for the timestamp has been stored
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID but lower timestamp and run, no flow 
file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (5, 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flow file will be 
transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (6, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set name as the max value column name (and clear the state), all 
rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(7, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a "higher" name than the max but lower than 
"NULL" (to test that null values are skipped), one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (7, 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all 
rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(8, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow 
file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (8, 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all 
rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, 
"bignum");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(9, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow 
file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on, bignum) VALUES (9, 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 
1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+    }
+
+
+    @Test
+    public void testWithNullIntColumn() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+            // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, 
NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 
1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTable.RESULT_ROW_COUNT,
 "2");
+    }
+
+    @Test
+    public void testWithSqlException() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NO_ROWS");
+        } catch (final SQLException sqle) {
+            // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+        }
+
+        stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+        runner.setIncomingConnection(false);
+        // Try a valid SQL statement that will generate an error (val1 does 
not exist, e.g.)
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NO_ROWS");
+        runner.setProperty(QueryDatabaseTable.COLUMN_NAMES, "val1");
+        runner.run();
+
+        
assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
+    }
+
+    private long getNumberOfRecordsFromStream(InputStream in) throws 
IOException {
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+            GenericRecord record = null;
+            long recordsFromStream = 0;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us 
from
+                // allocating and garbage collecting many objects for files 
with
+                // many items.
+                record = dataFileReader.next(record);
+                recordsFromStream += 1;
+            }
+
+            return recordsFromStream;
+        }
+    }
+
+    /**
+     * Simple implementation only for QueryDatabaseTable processor testing.
+     */
+    class DBCPServiceSimpleImpl extends AbstractControllerService implements 
DBCPService {
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION 
+ ";create=true");
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+
+    @Stateful(scopes = Scope.CLUSTER, description = "Mock for 
QueryDatabaseTable processor")
+    private static class MockQueryDatabaseTable extends QueryDatabaseTable {
+        public void putColumnType(String colName, Integer colType) {
+            columnTypeMap.put(colName, colType);
+        }
+    }
+}
\ No newline at end of file

Reply via email to