Repository: nifi Updated Branches: refs/heads/master c3d54ab72 -> 65b26e6f4
NIFI-1691: Add Fetch Size property to QueryDatabaseTable This closes #307 Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/65b26e6f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/65b26e6f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/65b26e6f Branch: refs/heads/master Commit: 65b26e6f41e9de5a198a1b310811ffa825104923 Parents: c3d54ab Author: Matt Burgess <[email protected]> Authored: Mon Mar 28 12:16:24 2016 -0400 Committer: Bryan Bende <[email protected]> Committed: Tue Mar 29 10:51:30 2016 -0400 ---------------------------------------------------------------------- .../processors/standard/QueryDatabaseTable.java | 21 ++++++++++++++++++++ .../standard/QueryDatabaseTableTest.java | 1 + 2 files changed, 22 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/65b26e6f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 08f6b41..9403eb8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -175,6 +175,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { .defaultValue("None") .build(); + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() + .name("Fetch Size") + .description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be " + + "honored and/or exact. If the value specified is zero, then the hint is ignored.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + private final List<PropertyDescriptor> propDescriptors; @@ -192,6 +201,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { pds.add(MAX_VALUE_COLUMN_NAMES); pds.add(QUERY_TIMEOUT); pds.add(SQL_PREPROCESS_STRATEGY); + pds.add(FETCH_SIZE); propDescriptors = Collections.unmodifiableList(pds); } @@ -251,6 +261,8 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue(); + final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); + final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -272,6 +284,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { + if (fetchSize != null && fetchSize > 0) { + try { + st.setFetchSize(fetchSize); + } catch (SQLException se) { + // Not all drivers support this, just log the error (at debug level) and move on + logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se); + } + } + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds http://git-wip-us.apache.org/repos/asf/nifi/blob/65b26e6f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index d16b9c6..f932e4d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -154,6 +154,7 @@ public class QueryDatabaseTableTest { runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); assertEquals(3, getNumberOfRecordsFromStream(in)); runner.clearTransferState();
