This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new d617c0b  NIFI-6865: Added Fetch Size property to ExecuteSQL processors
d617c0b is described below

commit d617c0b96ab5d2f42c467643aa55cc4ab9a70b27
Author: Matthew Burgess <[email protected]>
AuthorDate: Thu Nov 14 10:48:45 2019 -0500

    NIFI-6865: Added Fetch Size property to ExecuteSQL processors
    
    This closes #3888.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../processors/standard/AbstractExecuteSQL.java     | 21 +++++++++++++++++++++
 .../apache/nifi/processors/standard/ExecuteSQL.java |  1 +
 .../nifi/processors/standard/ExecuteSQLRecord.java  |  1 +
 .../nifi/processors/standard/TestExecuteSQL.java    |  1 +
 4 files changed, 24 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 700e92e..eeee845 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -155,6 +155,17 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("esql-fetch-size")
+            .displayName("Fetch Size")
+            .description("The number of result rows to be fetched from the 
result set at a time. This is a hint to the database driver and may not be "
+                    + "honored and/or exact. If the value specified is zero, 
then the hint is ignored.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
     protected List<PropertyDescriptor> propDescriptors;
 
     protected DBCPService dbcpService;
@@ -203,6 +214,8 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
         final Integer outputBatchSizeField = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         final int outputBatchSize = outputBatchSizeField == null ? 0 : 
outputBatchSizeField;
+        final Integer fetchSize = 
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+
         List<String> preQueries = 
getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
         List<String> postQueries = 
getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
 
@@ -222,6 +235,14 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         int resultCount = 0;
         try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes());
              final PreparedStatement st = con.prepareStatement(selectQuery)) {
+            if (fetchSize != null && fetchSize > 0) {
+                try {
+                    st.setFetchSize(fetchSize);
+                } catch (SQLException se) {
+                    // Not all drivers support this, just log the error (at 
debug level) and move on
+                    logger.debug("Cannot set fetch size to {} due to {}", new 
Object[]{fetchSize, se.getLocalizedMessage()}, se);
+                }
+            }
             st.setQueryTimeout(queryTimeout); // timeout in seconds
 
             // Execute pre-query, throw exception and cleanup Flow Files if 
fail
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index f058b77..03010bb 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -131,6 +131,7 @@ public class ExecuteSQL extends AbstractExecuteSQL {
         pds.add(DEFAULT_SCALE);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
+        pds.add(FETCH_SIZE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 897a929..c9d80eb 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -133,6 +133,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
         pds.add(USE_AVRO_LOGICAL_TYPES);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
+        pds.add(FETCH_SIZE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index efecbf1..0c6da65 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -360,6 +360,7 @@ public class TestExecuteSQL {
 
         runner.setIncomingConnection(false);
         runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(AbstractExecuteSQL.FETCH_SIZE, "5");
         runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0");
         runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
         runner.run();

Reply via email to