This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 63c72bd7e2 NIFI-11898 Handle commit based on driver capabilities in
PutDatabaseRecord
63c72bd7e2 is described below
commit 63c72bd7e2e27229039ef28f695b19cefa6fdba1
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Aug 2 12:10:10 2023 -0400
NIFI-11898 Handle commit based on driver capabilities in PutDatabaseRecord
This closes #7561
Signed-off-by: David Handermann <[email protected]>
---
.../processors/standard/PutDatabaseRecord.java | 20 ++++--
.../processors/standard/PutDatabaseRecordTest.java | 80 ++++++++++++++++++++--
2 files changed, 89 insertions(+), 11 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index f90f907fc4..73ec4e83ee 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -70,6 +70,7 @@ import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLDataException;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException;
import java.sql.Statement;
@@ -471,10 +472,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
connectionHolder = Optional.of(connection);
originalAutoCommit = connection.getAutoCommit();
- connection.setAutoCommit(false);
+ if (originalAutoCommit) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ getLogger().debug("setAutoCommit(false) not supported by
this driver");
+ }
+ }
putToDatabase(context, session, flowFile, connection);
- connection.commit();
+ // Only commit the connection if auto-commit is false
+ if (!originalAutoCommit) {
+ connection.commit();
+ }
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile,
getJdbcUrl(connection));
@@ -496,13 +506,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (rollbackOnFailure) {
session.rollback();
} else {
- flowFile = session.putAttribute(flowFile,
PUT_DATABASE_RECORD_ERROR, e.getMessage());
+ flowFile = session.putAttribute(flowFile,
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown":
e.getMessage()));
session.transfer(flowFile, relationship);
}
connectionHolder.ifPresent(connection -> {
try {
- connection.rollback();
+ if (!connection.getAutoCommit()) {
+ connection.rollback();
+ }
} catch (final Exception rollbackException) {
getLogger().error("Failed to rollback JDBC transaction",
rollbackException);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index e3c6600dc6..e9fd13b005 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -17,6 +17,8 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.dbcp2.DelegatingConnection;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.standard.db.ColumnDescription;
@@ -53,6 +55,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.ZoneOffset;
@@ -74,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -81,6 +85,8 @@ import static org.mockito.Mockito.when;
public class PutDatabaseRecordTest {
+ private static String DBCP_SERVICE_ID = "dbcp";
+
private static final String CONNECTION_FAILED = "Connection Failed";
private static final String PARSER_ID =
MockRecordParser.class.getSimpleName();
@@ -102,7 +108,7 @@ public class PutDatabaseRecordTest {
TestRunner runner;
PutDatabaseRecord processor;
- DBCPServiceSimpleImpl dbcp;
+ DBCPService dbcp;
@BeforeAll
public static void setDatabaseLocation() {
@@ -143,9 +149,9 @@ public class PutDatabaseRecordTest {
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
- runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
- runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+ runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
}
@Test
@@ -166,6 +172,42 @@ public class PutDatabaseRecordTest {
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
}
+ @Test
+ public void testSetAutoCommitFalseFailure() throws
InitializationException, SQLException {
+ dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION);
+ final Map<String, String> dbcpProperties = new HashMap<>();
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
+
+ recreateTable(createPersons);
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("code", RecordFieldType.INT);
+ parser.addSchemaField("dt", RecordFieldType.DATE);
+
+ LocalDate testDate1 = LocalDate.of(2021, 1, 26);
+ Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
+ LocalDate testDate2 = LocalDate.of(2021, 7, 26);
+ Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
+
+ parser.addRecord(1, "rec1", 101, jdbcDate1);
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS);
+ }
+
@Test
public void testInsertNonRequiredColumnsUnmatchedField() throws
InitializationException, ProcessException {
// Need to override the @Before method with a new processor that
behaves badly
@@ -176,9 +218,9 @@ public class PutDatabaseRecordTest {
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
- runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
- runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+ runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
recreateTable();
final MockRecordParser parser = new MockRecordParser();
@@ -1766,9 +1808,9 @@ public class PutDatabaseRecordTest {
void testInsertEnum() throws InitializationException, ProcessException,
SQLException, IOException {
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
runner = TestRunners.newTestRunner(processor);
- runner.addControllerService("dbcp", dbcp, new HashMap<>());
+ runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
- runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+ runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
try (Connection conn = dbcp.getConnection()) {
conn.createStatement().executeUpdate("DROP TABLE IF EXISTS
ENUM_TEST");
}
@@ -1949,4 +1991,28 @@ public class PutDatabaseRecordTest {
return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES
(?,?,?,?)", Arrays.asList(0, 1, 2, 3));
}
}
+
+ static class DBCPServiceAutoCommitTest extends AbstractControllerService
implements DBCPService {
+ private final String databaseLocation;
+
+ public DBCPServiceAutoCommitTest(final String databaseLocation) {
+ this.databaseLocation = databaseLocation;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return DBCP_SERVICE_ID;
+ }
+
+ @Override
+ public Connection getConnection() throws ProcessException {
+ try {
+ Connection spyConnection =
spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation +
";create=true"));
+
doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false);
+ return spyConnection;
+ } catch (final Exception e) {
+ throw new ProcessException("getConnection failed: " + e);
+ }
+ }
+ }
}
\ No newline at end of file