This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 4ef2251 NIFI-3611: Added ability to set Transaction Isolation Level
on Database connection for the QueryDatabaseTable processor
4ef2251 is described below
commit 4ef2251d74e25e0e55ea911e7041fc97b1497ae9
Author: erichanson5 <[email protected]>
AuthorDate: Mon Jan 7 15:53:58 2019 -0500
NIFI-3611: Added ability to set Transaction Isolation Level on Database
connection for the QueryDatabaseTable processor
NIFI-3611: Make TRANS_ISOLATION_LEVEL property optional
This closes #3248.
Signed-off-by: Koji Kawamura <[email protected]>
---
.../standard/AbstractQueryDatabaseTable.java | 38 +++++++++++++++++++++-
.../processors/standard/QueryDatabaseTable.java | 1 +
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 57933b3..6b166d9 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
@@ -65,6 +66,27 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
+ private static AllowableValue TRANSACTION_READ_COMMITTED = new
AllowableValue(
+ String.valueOf(Connection.TRANSACTION_READ_COMMITTED),
+ "TRANSACTION_READ_COMMITTED"
+ );
+ private static AllowableValue TRANSACTION_READ_UNCOMMITTED = new
AllowableValue(
+ String.valueOf(Connection.TRANSACTION_READ_UNCOMMITTED),
+ "TRANSACTION_READ_UNCOMMITTED"
+ );
+ private static AllowableValue TRANSACTION_REPEATABLE_READ = new
AllowableValue(
+ String.valueOf(Connection.TRANSACTION_REPEATABLE_READ),
+ "TRANSACTION_REPEATABLE_READ"
+ );
+ private static AllowableValue TRANSACTION_NONE = new AllowableValue(
+ String.valueOf(Connection.TRANSACTION_NONE),
+ "TRANSACTION_NONE"
+ );
+ private static AllowableValue TRANSACTION_SERIALIZABLE = new
AllowableValue(
+ String.valueOf(Connection.TRANSACTION_SERIALIZABLE),
+ "TRANSACTION_SERIALIZABLE"
+ );
+
public static final PropertyDescriptor FETCH_SIZE = new
PropertyDescriptor.Builder()
.name("Fetch Size")
.description("The number of result rows to be fetched from the
result set at a time. This is a hint to the database driver and may not be "
@@ -112,6 +134,14 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor TRANS_ISOLATION_LEVEL = new
PropertyDescriptor.Builder()
+ .name("transaction-isolation-level")
+ .displayName("Transaction Isolation Level")
+ .description("This setting will set the transaction isolation
level for the database connection for drivers that support this setting")
+ .required(false)
+ .allowableValues(TRANSACTION_NONE,TRANSACTION_READ_COMMITTED,
TRANSACTION_READ_UNCOMMITTED, TRANSACTION_REPEATABLE_READ,
TRANSACTION_SERIALIZABLE)
+ .build();
+
@Override
public Set<Relationship> getRelationships() {
return relationships;
@@ -170,7 +200,9 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
?
context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
: 0;
-
+ final Integer transIsolationLevel =
context.getProperty(TRANS_ISOLATION_LEVEL).isSet()
+ ? context.getProperty(TRANS_ISOLATION_LEVEL).asInteger()
+ : null;
SqlWriter sqlWriter = configureSqlWriter(session, context);
@@ -227,6 +259,10 @@ public abstract class AbstractQueryDatabaseTable extends
AbstractDatabaseFetchPr
}
}
+ if (transIsolationLevel != null) {
+ con.setTransactionIsolation(transIsolationLevel);
+ }
+
String jdbcURL = "DBCPService";
try {
DatabaseMetaData databaseMetaData = con.getMetaData();
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 0c7407f..1089370 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -109,6 +109,7 @@ public class QueryDatabaseTable extends
AbstractQueryDatabaseTable {
pds.add(OUTPUT_BATCH_SIZE);
pds.add(MAX_FRAGMENTS);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
+ pds.add(TRANS_ISOLATION_LEVEL);
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE);