Repository: nifi
Updated Branches:
  refs/heads/master c3d54ab72 -> 65b26e6f4


NIFI-1691: Add Fetch Size property to QueryDatabaseTable

This closes #307

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/65b26e6f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/65b26e6f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/65b26e6f

Branch: refs/heads/master
Commit: 65b26e6f41e9de5a198a1b310811ffa825104923
Parents: c3d54ab
Author: Matt Burgess <[email protected]>
Authored: Mon Mar 28 12:16:24 2016 -0400
Committer: Bryan Bende <[email protected]>
Committed: Tue Mar 29 10:51:30 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java | 21 ++++++++++++++++++++
 .../standard/QueryDatabaseTableTest.java        |  1 +
 2 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/65b26e6f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 08f6b41..9403eb8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -175,6 +175,15 @@ public class QueryDatabaseTable extends 
AbstractSessionFactoryProcessor {
             .defaultValue("None")
             .build();
 
+    public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Fetch Size")
+            .description("The number of result rows to be fetched from the 
result set at a time. This is a hint to the driver and may not be "
+                    + "honored and/or exact. If the value specified is zero, 
then the hint is ignored.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
 
     private final List<PropertyDescriptor> propDescriptors;
 
@@ -192,6 +201,7 @@ public class QueryDatabaseTable extends 
AbstractSessionFactoryProcessor {
         pds.add(MAX_VALUE_COLUMN_NAMES);
         pds.add(QUERY_TIMEOUT);
         pds.add(SQL_PREPROCESS_STRATEGY);
+        pds.add(FETCH_SIZE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -251,6 +261,8 @@ public class QueryDatabaseTable extends 
AbstractSessionFactoryProcessor {
         final String columnNames = 
context.getProperty(COLUMN_NAMES).getValue();
         final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
         final String preProcessStrategy = 
context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
+        final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
+
         final StateManager stateManager = context.getStateManager();
         final StateMap stateMap;
 
@@ -272,6 +284,15 @@ public class QueryDatabaseTable extends 
AbstractSessionFactoryProcessor {
         try (final Connection con = dbcpService.getConnection();
              final Statement st = con.createStatement()) {
 
+            if (fetchSize != null && fetchSize > 0) {
+                try {
+                    st.setFetchSize(fetchSize);
+                } catch (SQLException se) {
+                    // Not all drivers support this, just log the error (at 
debug level) and move on
+                    logger.debug("Cannot set fetch size to {} due to {}", new 
Object[]{fetchSize, se.getLocalizedMessage()}, se);
+                }
+            }
+
             final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
             st.setQueryTimeout(queryTimeout); // timeout in seconds
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/65b26e6f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index d16b9c6..f932e4d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -154,6 +154,7 @@ public class QueryDatabaseTableTest {
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
 
         InputStream in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
         assertEquals(3, getNumberOfRecordsFromStream(in));
         runner.clearTransferState();
 

Reply via email to