This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7767a5b85d NIFI-10265 Route to Failure on Connect Exceptions in 
PutDatabaseRecord
7767a5b85d is described below

commit 7767a5b85d1633fde17414bf203f6e6e659236f6
Author: exceptionfactory <[email protected]>
AuthorDate: Thu Jul 21 22:57:02 2022 -0500

    NIFI-10265 Route to Failure on Connect Exceptions in PutDatabaseRecord
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #6235
---
 .../processors/standard/PutDatabaseRecord.java     | 42 ++++++++-----
 .../processors/standard/PutDatabaseRecordTest.java | 73 ++++++++++++++--------
 2 files changed, 72 insertions(+), 43 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index c15cddca29..28f2a3b5e3 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -84,6 +84,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -479,10 +480,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
         }
 
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final Connection connection = 
dbcpService.getConnection(flowFile.getAttributes());
+        Optional<Connection> connectionHolder = Optional.empty();
 
         boolean originalAutoCommit = false;
         try {
+            final Connection connection = 
dbcpService.getConnection(flowFile.getAttributes());
+            connectionHolder = Optional.of(connection);
+
             originalAutoCommit = connection.getAutoCommit();
             connection.setAutoCommit(false);
 
@@ -513,25 +517,31 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 session.transfer(flowFile, relationship);
             }
 
-            try {
-                connection.rollback();
-            } catch (final Exception e1) {
-                getLogger().error("Failed to rollback JDBC transaction", e1);
-            }
-        } finally {
-            if (originalAutoCommit) {
+            connectionHolder.ifPresent(connection -> {
                 try {
-                    connection.setAutoCommit(true);
-                } catch (final Exception e) {
-                    getLogger().warn("Failed to set auto-commit back to true 
on connection {} after finishing update", connection);
+                    connection.rollback();
+                } catch (final Exception rollbackException) {
+                    getLogger().error("Failed to rollback JDBC transaction", 
rollbackException);
                 }
+            });
+        } finally {
+            if (originalAutoCommit) {
+                connectionHolder.ifPresent(connection -> {
+                    try {
+                        connection.setAutoCommit(true);
+                    } catch (final Exception autoCommitException) {
+                        getLogger().warn("Failed to set auto-commit back to 
true on connection", autoCommitException);
+                    }
+                });
             }
 
-            try {
-                connection.close();
-            } catch (final Exception e) {
-                getLogger().warn("Failed to close database connection", e);
-            }
+            connectionHolder.ifPresent(connection -> {
+                try {
+                    connection.close();
+                } catch (final Exception closeException) {
+                    getLogger().warn("Failed to close database connection", 
closeException);
+                }
+            });
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index da458908b5..04206f8933 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -24,10 +24,10 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.util.file.FileUtils;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
@@ -43,25 +43,28 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.Mockito.spy;
-
+import static org.mockito.Mockito.when;
 
 public class PutDatabaseRecordTest {
 
+    private static final String CONNECTION_FAILED = "Connection Failed";
+
+    private static final String PARSER_ID = 
MockRecordParser.class.getSimpleName();
+
+    private static final String TABLE_NAME = "PERSONS";
+
     private static final String createPersons = "CREATE TABLE PERSONS (id 
integer primary key, name varchar(100)," +
             " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 
1000), dt date)";
-    private static final String createPersonsSchema1 = "CREATE TABLE 
SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
-            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 
1000), dt date)";
-    private static final String createPersonsSchema2 = "CREATE TABLE 
SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
-            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 
1000), dt date)";
     private final static String DB_LOCATION = "target/db_pdr";
 
     TestRunner runner;
     PutDatabaseRecord processor;
     DBCPServiceSimpleImpl dbcp;
 
-    @BeforeClass
-    public static void setupBeforeClass() throws IOException {
+    @BeforeAll
+    public static void setDatabaseLocation() {
         System.setProperty("derby.stream.error.file", "target/derby.log");
 
         // remove previous test database, if any
@@ -73,8 +76,8 @@ public class PutDatabaseRecordTest {
         }
     }
 
-    @AfterClass
-    public static void cleanUpAfterClass() throws Exception {
+    @AfterAll
+    public static void shutdownDatabase() throws Exception {
         try {
             DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + 
";shutdown=true");
         } catch (SQLNonTransientConnectionException ignore) {
@@ -89,8 +92,8 @@ public class PutDatabaseRecordTest {
         }
     }
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    public void setRunner() throws Exception {
         processor = new PutDatabaseRecord();
         //Mock the DBCP Controller Service so we can control the Results
         dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
@@ -104,7 +107,25 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    public void testInsertNonRequiredColumnsUnmatchedField() throws 
InitializationException, ProcessException, SQLException, IOException {
+    public void testGetConnectionFailure() throws InitializationException {
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService(PARSER_ID, parser);
+        runner.enableControllerService(parser);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, PARSER_ID);
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, TABLE_NAME);
+
+        when(dbcp.getConnection(anyMap())).thenThrow(new 
ProcessException(CONNECTION_FAILED));
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
+    }
+
+    @Test
+    public void testInsertNonRequiredColumnsUnmatchedField() throws 
InitializationException, ProcessException {
         // Need to override the @Before method with a new processor that 
behaves badly
         processor = new PutDatabaseRecordUnmatchedField();
         //Mock the DBCP Controller Service so we can control the Results
@@ -117,9 +138,9 @@ public class PutDatabaseRecordTest {
         runner.enableControllerService(dbcp);
         runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
 
-        recreateTable(createPersons);
+        recreateTable();
         final MockRecordParser parser = new MockRecordParser();
-        runner.addControllerService("parser", parser);
+        runner.addControllerService(PARSER_ID, parser);
         runner.enableControllerService(parser);
 
         parser.addSchemaField("id", RecordFieldType.INT);
@@ -128,11 +149,9 @@ public class PutDatabaseRecordTest {
         parser.addSchemaField("dt", RecordFieldType.DATE);
 
         LocalDate testDate1 = LocalDate.of(2021, 1, 26);
-        Date nifiDate1 = new 
Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in 
UTC
-        Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
+        Date nifiDate1 = new 
Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
         LocalDate testDate2 = LocalDate.of(2021, 7, 26);
-        Date nifiDate2 = new 
Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in 
URC
-        Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
+        Date nifiDate2 = new 
Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
 
         parser.addRecord(1, "rec1", "test", nifiDate1);
         parser.addRecord(2, "rec2", "test", nifiDate2);
@@ -140,9 +159,9 @@ public class PutDatabaseRecordTest {
         parser.addRecord(4, "rec4", "test", null);
         parser.addRecord(5, null, null, null);
 
-        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, PARSER_ID);
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
-        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, TABLE_NAME);
         runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, 
PutDatabaseRecord.FAIL_UNMATCHED_FIELD);
 
         runner.enqueue(new byte[0]);
@@ -152,11 +171,11 @@ public class PutDatabaseRecordTest {
         runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
     }
 
-    private void recreateTable(String createSQL) throws ProcessException, 
SQLException {
+    private void recreateTable() throws ProcessException {
         try (final Connection conn = dbcp.getConnection();
             final Statement stmt = conn.createStatement()) {
             stmt.execute("drop table PERSONS");
-            stmt.execute(createSQL);
+            stmt.execute(createPersons);
         } catch (SQLException ignore) {
             // Do nothing, may not have existed
         }
@@ -164,7 +183,7 @@ public class PutDatabaseRecordTest {
 
     static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
         @Override
-        SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String 
tableName, TableSchema tableSchema, DMLSettings settings) throws 
IllegalArgumentException, SQLException {
+        SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String 
tableName, TableSchema tableSchema, DMLSettings settings) throws 
IllegalArgumentException {
             return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES 
(?,?,?,?)", Arrays.asList(0,1,2,3));
         }
     }

Reply via email to