This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new d617c0b NIFI-6865: Added Fetch Size property to ExecuteSQL processors
d617c0b is described below
commit d617c0b96ab5d2f42c467643aa55cc4ab9a70b27
Author: Matthew Burgess <[email protected]>
AuthorDate: Thu Nov 14 10:48:45 2019 -0500
NIFI-6865: Added Fetch Size property to ExecuteSQL processors
This closes #3888.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../processors/standard/AbstractExecuteSQL.java | 21 +++++++++++++++++++++
.../apache/nifi/processors/standard/ExecuteSQL.java | 1 +
.../nifi/processors/standard/ExecuteSQLRecord.java | 1 +
.../nifi/processors/standard/TestExecuteSQL.java | 1 +
4 files changed, 24 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 700e92e..eeee845 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -155,6 +155,17 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor FETCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("esql-fetch-size")
+ .displayName("Fetch Size")
+ .description("The number of result rows to be fetched from the
result set at a time. This is a hint to the database driver and may not be "
+ + "honored and/or exact. If the value specified is zero,
then the hint is ignored.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
protected List<PropertyDescriptor> propDescriptors;
protected DBCPService dbcpService;
@@ -203,6 +214,8 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
final Integer maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField =
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 :
outputBatchSizeField;
+ final Integer fetchSize =
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+
List<String> preQueries =
getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
List<String> postQueries =
getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
@@ -222,6 +235,14 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
int resultCount = 0;
try (final Connection con = dbcpService.getConnection(fileToProcess ==
null ? Collections.emptyMap() : fileToProcess.getAttributes());
final PreparedStatement st = con.prepareStatement(selectQuery)) {
+ if (fetchSize != null && fetchSize > 0) {
+ try {
+ st.setFetchSize(fetchSize);
+ } catch (SQLException se) {
+ // Not all drivers support this, just log the error (at
debug level) and move on
+ logger.debug("Cannot set fetch size to {} due to {}", new
Object[]{fetchSize, se.getLocalizedMessage()}, se);
+ }
+ }
st.setQueryTimeout(queryTimeout); // timeout in seconds
// Execute pre-query, throw exception and cleanup Flow Files if
fail
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index f058b77..03010bb 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -131,6 +131,7 @@ public class ExecuteSQL extends AbstractExecuteSQL {
pds.add(DEFAULT_SCALE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
+ pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 897a929..c9d80eb 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -133,6 +133,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
+ pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index efecbf1..0c6da65 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -360,6 +360,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+ runner.setProperty(AbstractExecuteSQL.FETCH_SIZE, "5");
runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM
TEST_NULL_INT");
runner.run();