This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 96b53d8 NIFI-9589: Support initial loading from the current max
values in QueryDatabaseTable* processors
96b53d8 is described below
commit 96b53d894375e98370d2bc41c8f4b3a9a1f1881d
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Jan 19 04:07:57 2022 +0100
NIFI-9589: Support initial loading from the current max values in
QueryDatabaseTable* processors
Signed-off-by: Pierre Villard <[email protected]>
This closes #5676.
---
.../standard/AbstractQueryDatabaseTable.java | 69 +++++++++++++-
.../processors/standard/QueryDatabaseTable.java | 1 +
.../standard/QueryDatabaseTableRecord.java | 1 +
.../standard/QueryDatabaseTableRecordTest.java | 102 ++++++++++++++++++++
.../standard/QueryDatabaseTableTest.java | 104 +++++++++++++++++++++
5 files changed, 276 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 2aa39e0..03df629 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -21,6 +21,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
@@ -49,6 +51,7 @@ import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +60,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -141,6 +145,20 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
.allowableValues(TRANSACTION_NONE,TRANSACTION_READ_COMMITTED,
TRANSACTION_READ_UNCOMMITTED, TRANSACTION_REPEATABLE_READ,
TRANSACTION_SERIALIZABLE)
.build();
+ public static final AllowableValue INITIAL_LOAD_STRATEGY_ALL_ROWS = new
AllowableValue("Start at Beginning", "Start at Beginning", "Loads all existing
rows from the database table.");
+ public static final AllowableValue INITIAL_LOAD_STRATEGY_NEW_ROWS = new
AllowableValue("Start at Current Maximum Values", "Start at Current Maximum
Values", "Loads only the newly " +
+ "inserted or updated rows based on the maximum value(s) of the
column(s) configured in the '" + MAX_VALUE_COLUMN_NAMES.getDisplayName() + "'
property.");
+
+ public static final PropertyDescriptor INITIAL_LOAD_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("initial-load-strategy")
+ .displayName("Initial Load Strategy")
+ .description("How to handle existing rows in the database table
when the processor is started for the first time (or its state has been
cleared). The property will be ignored, " +
+ "if any '" + INITIAL_MAX_VALUE_PROP_START + "*' dynamic
property has also been configured.")
+ .required(true)
+ .allowableValues(INITIAL_LOAD_STRATEGY_ALL_ROWS,
INITIAL_LOAD_STRATEGY_NEW_ROWS)
+ .defaultValue(INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue())
+ .build();
+
@Override
public Set<Relationship> getRelationships() {
return relationships;
@@ -163,6 +181,24 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
.build();
}
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
+ final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final boolean maxValueColumnNames =
validationContext.getProperty(MAX_VALUE_COLUMN_NAMES).isSet();
+ final String initialLoadStrategy =
validationContext.getProperty(INITIAL_LOAD_STRATEGY).getValue();
+ if (!maxValueColumnNames &&
initialLoadStrategy.equals(INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue())) {
+ results.add(new ValidationResult.Builder().valid(false)
+ .subject(INITIAL_LOAD_STRATEGY.getDisplayName())
+ .input(INITIAL_LOAD_STRATEGY_NEW_ROWS.getDisplayName())
+ .explanation(String.format("'%s' strategy can only be used
when '%s' property is also configured",
+ INITIAL_LOAD_STRATEGY_NEW_ROWS.getDisplayName(),
MAX_VALUE_COLUMN_NAMES.getDisplayName()))
+ .build());
+ }
+
+ return results;
+ }
+
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context, null);
@@ -191,7 +227,9 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
final String columnNames =
context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String sqlQuery =
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames =
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+ final String initialLoadStrategy =
context.getProperty(INITIAL_LOAD_STRATEGY).getValue();
final String customWhereClause =
context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
+ final Integer queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
final Integer fetchSize =
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final Integer maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField =
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
@@ -241,6 +279,36 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
List<String> maxValueColumnNameList =
StringUtils.isEmpty(maxValueColumnNames)
? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+
+ if (maxValueColumnNameList != null && statePropertyMap.isEmpty() &&
initialLoadStrategy.equals(INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue())) {
+ final String columnsClause = maxValueColumnNameList.stream()
+ .map(columnName -> String.format("MAX(%s) %s", columnName,
columnName))
+ .collect(Collectors.joining(", "));
+
+ final String selectMaxQuery =
dbAdapter.getSelectStatement(tableName, columnsClause, null, null, null, null);
+
+ try (final Connection con =
dbcpService.getConnection(Collections.emptyMap());
+ final Statement st = con.createStatement()) {
+
+ if (transIsolationLevel != null) {
+ con.setTransactionIsolation(transIsolationLevel);
+ }
+
+ st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+ try (final ResultSet resultSet =
st.executeQuery(selectMaxQuery)) {
+ if (resultSet.next()) {
+ final MaxValueResultSetRowCollector maxValCollector =
new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
+ maxValCollector.processRow(resultSet);
+ maxValCollector.applyStateChanges();
+ }
+ }
+ } catch (final Exception e) {
+ logger.error("Unable to execute SQL select query {} due to
{}", new Object[]{selectMaxQuery, e});
+ context.yield();
+ }
+ }
+
final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery,
columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
@@ -271,7 +339,6 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
// Ignore and use default JDBC URL. This shouldn't happen
unless the driver doesn't implement getMetaData() properly
}
- final Integer queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
if (logger.isDebugEnabled()) {
logger.debug("Executing query {}", new Object[] { selectQuery
});
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index d650386..1af871a 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -103,6 +103,7 @@ public class QueryDatabaseTable extends
AbstractQueryDatabaseTable {
pds.add(WHERE_CLAUSE);
pds.add(SQL_QUERY);
pds.add(MAX_VALUE_COLUMN_NAMES);
+ pds.add(INITIAL_LOAD_STRATEGY);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 8a2fd93..4f5a3e5 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -125,6 +125,7 @@ public class QueryDatabaseTableRecord extends
AbstractQueryDatabaseTable {
pds.add(SQL_QUERY);
pds.add(RECORD_WRITER_FACTORY);
pds.add(MAX_VALUE_COLUMN_NAMES);
+ pds.add(INITIAL_LOAD_STRATEGY);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
index 3f8a489..f46dcc5 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -949,6 +949,108 @@ public class QueryDatabaseTableRecordTest {
}
@Test
+ public void testInitialLoadStrategyStartAtBeginning() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount = 0;
+ //create larger row set
+ for (int batch = 0; batch < 10; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" +
dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" +
TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"created_on");
+ runner.setProperty(QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY,
QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue());
+
+ // Initial run with no previous state. Should get all 10 records
+ runner.run();
+
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "10");
+ runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialLoadStrategyStartAtCurrentMaximumValues() throws
SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount = 0;
+ //create larger row set
+ for (int batch = 0; batch < 10; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" +
dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" +
TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES,
"created_on");
+ runner.setProperty(QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY,
QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue());
+
+ // Initial run with no previous state. Should not get any records but
store Max Value in the state
+ runner.run();
+
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
public void testAddedRowsCustomWhereClause() throws SQLException,
IOException {
// load test data to database
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 8b51fe2..d7eec01 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -970,6 +970,110 @@ public class QueryDatabaseTableTest {
}
@Test
+ public void testInitialLoadStrategyStartAtBeginning() throws SQLException,
IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ InputStream in;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount=0;
+ //create larger row set
+ for(int batch=0;batch<10;batch++){
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" +
dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" +
TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES,
"created_on");
+ runner.setProperty(QueryDatabaseTable.INITIAL_LOAD_STRATEGY,
QueryDatabaseTable.INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue());
+
+ // Initial run with no previous state. Should get all 10 records
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS,
1);
+ in = new
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(10, getNumberOfRecordsFromStream(in));
+ runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS,
0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialLoadStrategyStartAtCurrentMaximumValues() throws
SQLException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ InputStream in;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount=0;
+ //create larger row set
+ for(int batch=0;batch<10;batch++){
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" +
dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" +
TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES,
"created_on");
+ runner.setProperty(QueryDatabaseTable.INITIAL_LOAD_STRATEGY,
QueryDatabaseTable.INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue());
+
+ // Initial run with no previous state. Should not get any records but
store Max Value in the state
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS,
0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS,
0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" +
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01
00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
public void testAddedRowsCustomWhereClause() throws
ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database