This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ef2251  NIFI-3611: Added ability to set Transaction Isolation Level 
on Database connection for the QueryDatabaseTable processor
4ef2251 is described below

commit 4ef2251d74e25e0e55ea911e7041fc97b1497ae9
Author: erichanson5 <[email protected]>
AuthorDate: Mon Jan 7 15:53:58 2019 -0500

    NIFI-3611: Added ability to set Transaction Isolation Level on Database 
connection for the QueryDatabaseTable processor
    
    NIFI-3611: Make TRANS_ISOLATION_LEVEL property optional
    
    This closes #3248.
    
    Signed-off-by: Koji Kawamura <[email protected]>
---
 .../standard/AbstractQueryDatabaseTable.java       | 38 +++++++++++++++++++++-
 .../processors/standard/QueryDatabaseTable.java    |  1 +
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 57933b3..6b166d9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -65,6 +66,27 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
     public static final String RESULT_TABLENAME = "tablename";
     public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
 
+    private static AllowableValue TRANSACTION_READ_COMMITTED = new 
AllowableValue(
+            String.valueOf(Connection.TRANSACTION_READ_COMMITTED),
+            "TRANSACTION_READ_COMMITTED"
+    );
+    private static AllowableValue TRANSACTION_READ_UNCOMMITTED = new 
AllowableValue(
+            String.valueOf(Connection.TRANSACTION_READ_UNCOMMITTED),
+            "TRANSACTION_READ_UNCOMMITTED"
+    );
+    private static AllowableValue TRANSACTION_REPEATABLE_READ = new 
AllowableValue(
+            String.valueOf(Connection.TRANSACTION_REPEATABLE_READ),
+            "TRANSACTION_REPEATABLE_READ"
+    );
+    private static AllowableValue TRANSACTION_NONE =  new AllowableValue(
+            String.valueOf(Connection.TRANSACTION_NONE),
+            "TRANSACTION_NONE"
+    );
+    private static AllowableValue TRANSACTION_SERIALIZABLE = new 
AllowableValue(
+            String.valueOf(Connection.TRANSACTION_SERIALIZABLE),
+            "TRANSACTION_SERIALIZABLE"
+    );
+
     public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
             .name("Fetch Size")
             .description("The number of result rows to be fetched from the 
result set at a time. This is a hint to the database driver and may not be "
@@ -112,6 +134,14 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor TRANS_ISOLATION_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("transaction-isolation-level")
+            .displayName("Transaction Isolation Level")
+            .description("This setting will set the transaction isolation 
level for the database connection for drivers that support this setting")
+            .required(false)
+            .allowableValues(TRANSACTION_NONE,TRANSACTION_READ_COMMITTED, 
TRANSACTION_READ_UNCOMMITTED, TRANSACTION_REPEATABLE_READ, 
TRANSACTION_SERIALIZABLE)
+            .build();
+
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
@@ -170,7 +200,9 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
         final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
                 ? 
context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
                 : 0;
-
+        final Integer transIsolationLevel = 
context.getProperty(TRANS_ISOLATION_LEVEL).isSet()
+                ? context.getProperty(TRANS_ISOLATION_LEVEL).asInteger()
+                : null;
 
         SqlWriter sqlWriter = configureSqlWriter(session, context);
 
@@ -227,6 +259,10 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
                 }
             }
 
+            if (transIsolationLevel != null) {
+                con.setTransactionIsolation(transIsolationLevel);
+            }
+
             String jdbcURL = "DBCPService";
             try {
                 DatabaseMetaData databaseMetaData = con.getMetaData();
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 0c7407f..1089370 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -109,6 +109,7 @@ public class QueryDatabaseTable extends 
AbstractQueryDatabaseTable {
         pds.add(OUTPUT_BATCH_SIZE);
         pds.add(MAX_FRAGMENTS);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
+        pds.add(TRANS_ISOLATION_LEVEL);
         pds.add(USE_AVRO_LOGICAL_TYPES);
         pds.add(DEFAULT_PRECISION);
         pds.add(DEFAULT_SCALE);

Reply via email to