This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 700f39963f NIFI-11898 Handle commit based on driver capabilities in 
PutDatabaseRecord
700f39963f is described below

commit 700f39963f5f785bd26c794ecfba5fa84c05d4fc
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Aug 2 12:10:10 2023 -0400

    NIFI-11898 Handle commit based on driver capabilities in PutDatabaseRecord
    
    This closes #7560
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/standard/PutDatabaseRecord.java     | 20 ++++--
 .../processors/standard/PutDatabaseRecordTest.java | 80 ++++++++++++++++++++--
 2 files changed, 89 insertions(+), 11 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 02f36b53cf..fdad3fe56f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -72,6 +72,7 @@ import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.SQLTransientException;
 import java.sql.Statement;
@@ -487,10 +488,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
             connectionHolder = Optional.of(connection);
 
             originalAutoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
+            if (originalAutoCommit) {
+                try {
+                    connection.setAutoCommit(false);
+                } catch (SQLFeatureNotSupportedException sfnse) {
+                    getLogger().debug("setAutoCommit(false) not supported by 
this driver");
+                }
+            }
 
             putToDatabase(context, session, flowFile, connection);
-            connection.commit();
+            // Only commit the connection if auto-commit is false
+            if (!originalAutoCommit) {
+                connection.commit();
+            }
 
             session.transfer(flowFile, REL_SUCCESS);
             session.getProvenanceReporter().send(flowFile, 
getJdbcUrl(connection));
@@ -512,13 +522,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
             if (rollbackOnFailure) {
                 session.rollback();
             } else {
-                flowFile = session.putAttribute(flowFile, 
PUT_DATABASE_RECORD_ERROR, e.getMessage());
+                flowFile = session.putAttribute(flowFile, 
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": 
e.getMessage()));
                 session.transfer(flowFile, relationship);
             }
 
             connectionHolder.ifPresent(connection -> {
                 try {
-                    connection.rollback();
+                    if (!connection.getAutoCommit()) {
+                        connection.rollback();
+                    }
                 } catch (final Exception rollbackException) {
                     getLogger().error("Failed to rollback JDBC transaction", 
rollbackException);
                 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index e3c6600dc6..e9fd13b005 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.dbcp2.DelegatingConnection;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.processors.standard.db.ColumnDescription;
@@ -53,6 +55,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
 import java.time.LocalDate;
 import java.time.ZoneOffset;
@@ -74,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -81,6 +85,8 @@ import static org.mockito.Mockito.when;
 
 public class PutDatabaseRecordTest {
 
+    private static String DBCP_SERVICE_ID = "dbcp";
+
     private static final String CONNECTION_FAILED = "Connection Failed";
 
     private static final String PARSER_ID = 
MockRecordParser.class.getSimpleName();
@@ -102,7 +108,7 @@ public class PutDatabaseRecordTest {
 
     TestRunner runner;
     PutDatabaseRecord processor;
-    DBCPServiceSimpleImpl dbcp;
+    DBCPService dbcp;
 
     @BeforeAll
     public static void setDatabaseLocation() {
@@ -143,9 +149,9 @@ public class PutDatabaseRecordTest {
         final Map<String, String> dbcpProperties = new HashMap<>();
 
         runner = TestRunners.newTestRunner(processor);
-        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
         runner.enableControllerService(dbcp);
-        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
     }
 
     @Test
@@ -166,6 +172,42 @@ public class PutDatabaseRecordTest {
         runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
     }
 
+    @Test
+    public void testSetAutoCommitFalseFailure() throws 
InitializationException, SQLException {
+        dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION);
+        final Map<String, String> dbcpProperties = new HashMap<>();
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
+
+        recreateTable(createPersons);
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("code", RecordFieldType.INT);
+        parser.addSchemaField("dt", RecordFieldType.DATE);
+
+        LocalDate testDate1 = LocalDate.of(2021, 1, 26);
+        Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
+        LocalDate testDate2 = LocalDate.of(2021, 7, 26);
+        Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
+
+        parser.addRecord(1, "rec1", 101, jdbcDate1);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS);
+    }
+
     @Test
     public void testInsertNonRequiredColumnsUnmatchedField() throws 
InitializationException, ProcessException {
         // Need to override the @Before method with a new processor that 
behaves badly
@@ -176,9 +218,9 @@ public class PutDatabaseRecordTest {
         final Map<String, String> dbcpProperties = new HashMap<>();
 
         runner = TestRunners.newTestRunner(processor);
-        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
         runner.enableControllerService(dbcp);
-        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
 
         recreateTable();
         final MockRecordParser parser = new MockRecordParser();
@@ -1766,9 +1808,9 @@ public class PutDatabaseRecordTest {
     void testInsertEnum() throws InitializationException, ProcessException, 
SQLException, IOException {
         dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
         runner = TestRunners.newTestRunner(processor);
-        runner.addControllerService("dbcp", dbcp, new HashMap<>());
+        runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>());
         runner.enableControllerService(dbcp);
-        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
         try (Connection conn = dbcp.getConnection()) {
             conn.createStatement().executeUpdate("DROP TABLE IF EXISTS 
ENUM_TEST");
         }
@@ -1949,4 +1991,28 @@ public class PutDatabaseRecordTest {
             return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES 
(?,?,?,?)", Arrays.asList(0, 1, 2, 3));
         }
     }
+
+    static class DBCPServiceAutoCommitTest extends AbstractControllerService 
implements DBCPService {
+        private final String databaseLocation;
+
+        public DBCPServiceAutoCommitTest(final String databaseLocation) {
+            this.databaseLocation = databaseLocation;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return DBCP_SERVICE_ID;
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Connection spyConnection =  
spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + 
";create=true"));
+                
doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false);
+                return spyConnection;
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
 }
\ No newline at end of file

Reply via email to