This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 932cfe22a3 NIFI-12010: Handle auto-commit and commit based on driver 
capabilities in SQL components
932cfe22a3 is described below

commit 932cfe22a379ea83e11fe283a3f789115cce1607
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Aug 30 23:30:22 2023 -0400

    NIFI-12010: Handle auto-commit and commit based on driver capabilities in 
SQL components
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #7663
---
 .../org/apache/nifi/admin/AuditDataSourceFactoryBean.java   | 10 +++++++++-
 .../transaction/impl/StandardTransactionBuilder.java        | 10 +++++++++-
 .../apache/nifi/processors/groovyx/ExecuteGroovyScript.java | 13 +++++++++++--
 .../java/org/apache/nifi/hive/metastore/ScriptRunner.java   |  5 ++++-
 .../apache/nifi/processors/standard/AbstractExecuteSQL.java | 12 +++++++++++-
 .../java/org/apache/nifi/processors/standard/PutSQL.java    |  7 ++++++-
 .../org/apache/nifi/processors/standard/TestExecuteSQL.java |  3 ++-
 .../org/apache/nifi/record/sink/db/DatabaseRecordSink.java  |  9 ++++++++-
 8 files changed, 60 insertions(+), 9 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java
index 22165d2b68..040a207dcb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java
@@ -27,6 +27,7 @@ import java.io.File;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
 
 /**
@@ -146,7 +147,14 @@ public class AuditDataSourceFactoryBean implements 
FactoryBean {
             try {
                 // get a connection
                 connection = connectionPool.getConnection();
-                connection.setAutoCommit(false);
+                final boolean isAutoCommit = connection.getAutoCommit();
+                if (isAutoCommit) {
+                    try {
+                        connection.setAutoCommit(false);
+                    } catch (SQLFeatureNotSupportedException sfnse) {
+                        logger.debug("setAutoCommit(false) not supported by 
this driver");
+                    }
+                }
 
                 // create a statement for initializing the database
                 statement = connection.createStatement();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java
index 7d4a1fcc44..e4b12180c2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java
@@ -18,6 +18,7 @@ package org.apache.nifi.admin.service.transaction.impl;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import javax.sql.DataSource;
 import org.apache.nifi.admin.service.transaction.Transaction;
 import org.apache.nifi.admin.service.transaction.TransactionBuilder;
@@ -35,7 +36,14 @@ public class StandardTransactionBuilder implements 
TransactionBuilder {
         try {
             // get a new connection
             Connection connection = dataSource.getConnection();
-            connection.setAutoCommit(false);
+            final boolean isAutoCommit = connection.getAutoCommit();
+            if (isAutoCommit) {
+                try {
+                    connection.setAutoCommit(false);
+                } catch (SQLFeatureNotSupportedException sfnse) {
+                    throw new TransactionException("setAutoCommit(false) not 
supported by this driver");
+                }
+            }
 
             // create a new transaction
             return new StandardTransaction(connection);
diff --git 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
index c788e2c4dc..eca8b431a4 100644
--- 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
+++ 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
@@ -56,6 +56,7 @@ import org.codehaus.groovy.runtime.StackTraceUtils;
 import java.io.File;
 import java.lang.reflect.Method;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -353,7 +354,11 @@ public class ExecuteGroovyScript extends AbstractProcessor 
{
             //try to set autocommit to false
             try {
                 if (sql.getConnection().getAutoCommit()) {
-                    sql.getConnection().setAutoCommit(false);
+                    try {
+                        sql.getConnection().setAutoCommit(false);
+                    } catch (SQLFeatureNotSupportedException sfnse) {
+                        getLogger().debug("setAutoCommit(false) not supported 
by this driver");
+                    }
                 }
             } catch (Throwable ei) {
                 getLogger().warn("Failed to set autocommit=false for `" + 
e.getKey() + "`", ei);
@@ -382,7 +387,11 @@ public class ExecuteGroovyScript extends AbstractProcessor 
{
             OSql sql = (OSql) e.getValue();
             try {
                 if (!sql.getConnection().getAutoCommit()) {
-                    sql.getConnection().setAutoCommit(true); //default 
autocommit value in nifi
+                    try {
+                        sql.getConnection().setAutoCommit(true); //default 
autocommit value in nifi
+                    } catch (SQLFeatureNotSupportedException sfnse) {
+                        getLogger().debug("setAutoCommit(true) not supported 
by this driver");
+                    }
                 }
             } catch (Throwable ei) {
                 getLogger().warn("Failed to set autocommit=true for `" + 
e.getKey() + "`", ei);
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java
index 4e4c65d043..d3666fdbe4 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java
@@ -33,7 +33,10 @@ public class ScriptRunner {
 
     public ScriptRunner(Connection connection) throws SQLException {
         this.connection = connection;
-        this.connection.setAutoCommit(true);
+        if (!this.connection.getAutoCommit()) {
+            // May throw SQLFeatureNotSupportedException which is a subclass 
of SQLException
+            this.connection.setAutoCommit(true);
+        }
     }
 
     public void runScript(Reader reader) throws IOException, SQLException {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index cb6bc761f8..a76b2386be 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -42,6 +42,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -255,7 +256,16 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
 
         int resultCount = 0;
         try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
-            con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean());
+            final boolean isAutoCommit = con.getAutoCommit();
+            final boolean setAutoCommitValue = 
context.getProperty(AUTO_COMMIT).asBoolean();
+            // Only set auto-commit if necessary, log any "feature not 
supported" exceptions
+            if (isAutoCommit != setAutoCommitValue) {
+                try {
+                    con.setAutoCommit(setAutoCommitValue);
+                } catch (SQLFeatureNotSupportedException sfnse) {
+                    logger.debug("setAutoCommit({}) not supported by this 
driver", setAutoCommitValue);
+                }
+            }
             try (final PreparedStatement st = 
con.prepareStatement(selectQuery)) {
                 if (fetchSize != null && fetchSize > 0) {
                     try {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 4c365c91f2..3068f07587 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -60,6 +60,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLNonTransientException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -291,7 +292,11 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
             fc.originalAutoCommit = connection.getAutoCommit();
             final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
             if(fc.originalAutoCommit != autocommit) {
-                connection.setAutoCommit(autocommit);
+                try {
+                    connection.setAutoCommit(autocommit);
+                } catch (SQLFeatureNotSupportedException sfnse) {
+                    getLogger().debug("setAutoCommit({}) not supported by this 
driver", autocommit);
+                }
             }
         } catch (SQLException e) {
             throw new ProcessException("Failed to disable auto commit due to " 
+ e, e);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index b993587377..69c6dab739 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -786,7 +787,7 @@ public class TestExecuteSQL {
             try {
                 Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
                 final Connection con = 
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
-                return con;
+                return Mockito.spy(con);
             } catch (final Exception e) {
                 throw new ProcessException("getConnection failed: " + e);
             }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java
index 27d9b79119..0a4a1fbfb7 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java
@@ -46,6 +46,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -202,7 +203,13 @@ public class DatabaseRecordSink extends 
AbstractControllerService implements Rec
         try {
             connection = dbcpService.getConnection(attributes);
             originalAutoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
+            if (originalAutoCommit) {
+                try {
+                    connection.setAutoCommit(false);
+                } catch (SQLFeatureNotSupportedException sfnse) {
+                    getLogger().debug("setAutoCommit(false) not supported by 
this driver");
+                }
+            }
             final DMLSettings settings = new DMLSettings(context);
             final String catalog = 
context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue();
             final String schemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue();

Reply via email to