This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new af8128bb5c NIFI-12010: Handle auto-commit and commit based on driver
capabilities in SQL components
af8128bb5c is described below
commit af8128bb5c26e418d31ec38741648dd87607c33b
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Aug 30 23:30:22 2023 -0400
NIFI-12010: Handle auto-commit and commit based on driver capabilities in
SQL components
Signed-off-by: Arpad Boda <[email protected]>
This closes #7663
---
.../org/apache/nifi/admin/AuditDataSourceFactoryBean.java | 10 +++++++++-
.../transaction/impl/StandardTransactionBuilder.java | 10 +++++++++-
.../apache/nifi/processors/groovyx/ExecuteGroovyScript.java | 13 +++++++++++--
.../java/org/apache/nifi/hive/metastore/ScriptRunner.java | 5 ++++-
.../apache/nifi/processors/standard/AbstractExecuteSQL.java | 12 +++++++++++-
.../java/org/apache/nifi/processors/standard/PutSQL.java | 7 ++++++-
.../org/apache/nifi/processors/standard/TestExecuteSQL.java | 3 ++-
.../org/apache/nifi/record/sink/db/DatabaseRecordSink.java | 9 ++++++++-
8 files changed, 60 insertions(+), 9 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java
index 11d7c30e64..4bac1716c0 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java
@@ -28,6 +28,7 @@ import java.io.File;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
/**
@@ -151,7 +152,14 @@ public class AuditDataSourceFactoryBean implements
FactoryBean {
try {
// get a connection
connection = connectionPool.getConnection();
- connection.setAutoCommit(false);
+ final boolean isAutoCommit = connection.getAutoCommit();
+ if (isAutoCommit) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ logger.debug("setAutoCommit(false) not supported by
this driver");
+ }
+ }
// create a statement for initializing the database
statement = connection.createStatement();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java
index 7d4a1fcc44..e4b12180c2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java
@@ -18,6 +18,7 @@ package org.apache.nifi.admin.service.transaction.impl;
import java.sql.Connection;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import javax.sql.DataSource;
import org.apache.nifi.admin.service.transaction.Transaction;
import org.apache.nifi.admin.service.transaction.TransactionBuilder;
@@ -35,7 +36,14 @@ public class StandardTransactionBuilder implements
TransactionBuilder {
try {
// get a new connection
Connection connection = dataSource.getConnection();
- connection.setAutoCommit(false);
+ final boolean isAutoCommit = connection.getAutoCommit();
+ if (isAutoCommit) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ throw new TransactionException("setAutoCommit(false) not
supported by this driver");
+ }
+ }
// create a new transaction
return new StandardTransaction(connection);
diff --git
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
index 08addbdb8b..23b91ae350 100644
---
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
+++
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
@@ -57,6 +57,7 @@ import org.codehaus.groovy.runtime.StackTraceUtils;
import java.io.File;
import java.lang.reflect.Method;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -355,7 +356,11 @@ public class ExecuteGroovyScript extends AbstractProcessor
{
//try to set autocommit to false
try {
if (sql.getConnection().getAutoCommit()) {
- sql.getConnection().setAutoCommit(false);
+ try {
+ sql.getConnection().setAutoCommit(false);
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ getLogger().debug("setAutoCommit(false) not supported
by this driver");
+ }
}
} catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=false for `" +
e.getKey() + "`", ei);
@@ -384,7 +389,11 @@ public class ExecuteGroovyScript extends AbstractProcessor
{
OSql sql = (OSql) e.getValue();
try {
if (!sql.getConnection().getAutoCommit()) {
- sql.getConnection().setAutoCommit(true); //default
autocommit value in nifi
+ try {
+ sql.getConnection().setAutoCommit(true); //default
autocommit value in nifi
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ getLogger().debug("setAutoCommit(true) not supported
by this driver");
+ }
}
} catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=true for `" +
e.getKey() + "`", ei);
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java
index 4e4c65d043..d3666fdbe4 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java
@@ -33,7 +33,10 @@ public class ScriptRunner {
public ScriptRunner(Connection connection) throws SQLException {
this.connection = connection;
- this.connection.setAutoCommit(true);
+ if (!this.connection.getAutoCommit()) {
+ // May throw SQLFeatureNotSupportedException which is a subclass
of SQLException
+ this.connection.setAutoCommit(true);
+ }
}
public void runScript(Reader reader) throws IOException, SQLException {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index cb6bc761f8..a76b2386be 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -42,6 +42,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
@@ -255,7 +256,16 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
int resultCount = 0;
try (final Connection con = dbcpService.getConnection(fileToProcess ==
null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
- con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean());
+ final boolean isAutoCommit = con.getAutoCommit();
+ final boolean setAutoCommitValue =
context.getProperty(AUTO_COMMIT).asBoolean();
+ // Only set auto-commit if necessary, log any "feature not
supported" exceptions
+ if (isAutoCommit != setAutoCommitValue) {
+ try {
+ con.setAutoCommit(setAutoCommitValue);
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ logger.debug("setAutoCommit({}) not supported by this
driver", setAutoCommitValue);
+ }
+ }
try (final PreparedStatement st =
con.prepareStatement(selectQuery)) {
if (fetchSize != null && fetchSize > 0) {
try {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 4c365c91f2..3068f07587 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -60,6 +60,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.util.ArrayList;
@@ -291,7 +292,11 @@ public class PutSQL extends
AbstractSessionFactoryProcessor {
fc.originalAutoCommit = connection.getAutoCommit();
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
if(fc.originalAutoCommit != autocommit) {
- connection.setAutoCommit(autocommit);
+ try {
+ connection.setAutoCommit(autocommit);
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ getLogger().debug("setAutoCommit({}) not supported by this
driver", autocommit);
+ }
}
} catch (SQLException e) {
throw new ProcessException("Failed to disable auto commit due to "
+ e, e);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index b993587377..69c6dab739 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -786,7 +787,7 @@ public class TestExecuteSQL {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con =
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
- return con;
+ return Mockito.spy(con);
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java
index 27d9b79119..0a4a1fbfb7 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java
@@ -46,6 +46,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLDataException;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -202,7 +203,13 @@ public class DatabaseRecordSink extends
AbstractControllerService implements Rec
try {
connection = dbcpService.getConnection(attributes);
originalAutoCommit = connection.getAutoCommit();
- connection.setAutoCommit(false);
+ if (originalAutoCommit) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ getLogger().debug("setAutoCommit(false) not supported by
this driver");
+ }
+ }
final DMLSettings settings = new DMLSettings(context);
final String catalog =
context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue();
final String schemaName =
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue();