This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1514890371 NIFI-12993 Add auto commit feature and add batch processing 
for the sql stmt type
1514890371 is described below

commit 15148903712761e52868b59060ba2235486f0523
Author: Jim Steinebrey <[email protected]>
AuthorDate: Tue Apr 2 14:51:34 2024 -0400

    NIFI-12993 Add auto commit feature and add batch processing for the sql 
stmt type
    
    NIFI-12993 Removed underscore from a few local variables.
    
    NIFI-12993 Refactored unit tests into a single java file
    
    NIFI-12993 Changed Optional.isEmpty() ro !Optional.isPresent() so it can 
work in Nifi 1.x Java 8
    
    Signed-off-by: Matt Burgess <[email protected]>
    
    This closes #8597
---
 .../processors/standard/PutDatabaseRecord.java     | 243 ++++++++---
 .../processors/standard/PutDatabaseRecordTest.java | 458 +++++++++++++++++----
 2 files changed, 554 insertions(+), 147 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 26b4281cd0..79b98d20e2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -27,6 +27,7 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.documentation.UseCase;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
@@ -376,6 +377,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
             .build();
 
+    static final PropertyDescriptor AUTO_COMMIT = new 
PropertyDescriptor.Builder()
+            .name("database-session-autocommit")
+            .displayName("Database Session AutoCommit")
+            .description("The autocommit mode to set on the database 
connection being used. If set to false, the operation(s) will be explicitly 
committed or rolled back "
+                    + "(based on success or failure respectively). If set to 
true, the driver/database automatically handles the commit/rollback.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     static final PropertyDescriptor DB_TYPE;
 
     protected static final Map<String, DatabaseAdapter> dbAdapters;
@@ -431,6 +443,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
         pds.add(TABLE_SCHEMA_CACHE_SIZE);
         pds.add(MAX_BATCH_SIZE);
+        pds.add(AUTO_COMMIT);
 
         propDescriptors = Collections.unmodifiableList(pds);
     }
@@ -466,9 +479,41 @@ public class PutDatabaseRecord extends AbstractProcessor {
             );
         }
 
+        final boolean autoCommit = 
validationContext.getProperty(AUTO_COMMIT).asBoolean();
+        final boolean rollbackOnFailure = 
validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+        if (autoCommit && rollbackOnFailure) {
+            validationResults.add(new ValidationResult.Builder()
+                    
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
+                    .explanation(format("'%s' cannot be set to 'true' when 
'%s' is also set to 'true'. "
+                                    + "Transaction rollbacks for batch updates 
cannot be supported when auto commit is set to 'true'",
+                            
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), 
AUTO_COMMIT.getDisplayName()))
+                    .build());
+        }
+
+        if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
+                final String explanation = format("'%s' must be hard-coded to 
zero when '%s' is set to 'true'."
+                                + " Batch size equal to zero executes all 
statements in a single transaction"
+                                + " which allows automatic rollback to revert 
all statements if an error occurs",
+                        MAX_BATCH_SIZE.getDisplayName(), 
AUTO_COMMIT.getDisplayName());
+
+                validationResults.add(new ValidationResult.Builder()
+                        .subject(MAX_BATCH_SIZE.getDisplayName())
+                        .explanation(explanation)
+                        .build());
+        }
+
         return validationResults;
     }
 
+    private boolean isMaxBatchSizeHardcodedToZero(ValidationContext 
validationContext) {
+        try {
+            return 
!validationContext.getProperty(MAX_BATCH_SIZE).isExpressionLanguagePresent()
+                    && 0 == 
validationContext.getProperty(MAX_BATCH_SIZE).asInteger();
+        } catch (Exception ex) {
+            return false;
+        }
+    }
+
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         databaseAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
@@ -490,6 +535,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
         dataRecordPath = dataRecordPathValue == null ? null : 
RecordPath.compile(dataRecordPathValue);
     }
 
+    @OnUnscheduled
+    public final void onUnscheduled() {
+        supportsBatchUpdates = Optional.empty();
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile flowFile = session.get();
@@ -498,84 +548,98 @@ public class PutDatabaseRecord extends AbstractProcessor {
         }
 
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        Optional<Connection> connectionHolder = Optional.empty();
 
+        Connection connection = null;
         boolean originalAutoCommit = false;
         try {
-            final Connection connection = 
dbcpService.getConnection(flowFile.getAttributes());
-            connectionHolder = Optional.of(connection);
+            connection = dbcpService.getConnection(flowFile.getAttributes());
 
             originalAutoCommit = connection.getAutoCommit();
-            if (originalAutoCommit) {
+            final boolean autoCommit = 
context.getProperty(AUTO_COMMIT).asBoolean();
+            if (originalAutoCommit != autoCommit) {
                 try {
-                    connection.setAutoCommit(false);
+                    connection.setAutoCommit(autoCommit);
                 } catch (SQLFeatureNotSupportedException sfnse) {
-                    getLogger().debug("setAutoCommit(false) not supported by 
this driver");
+                    getLogger().debug(String.format("setAutoCommit(%s) not 
supported by this driver", autoCommit), sfnse);
                 }
             }
 
             putToDatabase(context, session, flowFile, connection);
+
             // Only commit the connection if auto-commit is false
-            if (!originalAutoCommit) {
+            if (!connection.getAutoCommit()) {
                 connection.commit();
             }
 
             session.transfer(flowFile, REL_SUCCESS);
             session.getProvenanceReporter().send(flowFile, 
getJdbcUrl(connection));
         } catch (final Exception e) {
-            // When an Exception is thrown, we want to route to 'retry' if we 
expect that attempting the same request again
-            // might work. Otherwise, route to failure. SQLTransientException 
is a specific type that indicates that a retry may work.
-            final Relationship relationship;
-            final Throwable toAnalyze = (e instanceof BatchUpdateException) ? 
e.getCause() : e;
-            if (toAnalyze instanceof SQLTransientException) {
-                relationship = REL_RETRY;
-                flowFile = session.penalize(flowFile);
-            } else {
-                relationship = REL_FAILURE;
-            }
+            routeOnException(context, session, connection, e, flowFile);
+        } finally {
+            closeConnection(connection, originalAutoCommit);
+        }
+    }
 
-            getLogger().error("Failed to put Records to database for {}. 
Routing to {}.", flowFile, relationship, e);
+    private void routeOnException(final ProcessContext context, final 
ProcessSession session,
+                                  Connection connection, Exception e, FlowFile 
flowFile) {
+        // When an Exception is thrown, we want to route to 'retry' if we 
expect that attempting the same request again
+        // might work. Otherwise, route to failure. SQLTransientException is a 
specific type that indicates that a retry may work.
+        final Relationship relationship;
+        final Throwable toAnalyze = (e instanceof BatchUpdateException) ? 
e.getCause() : e;
+        if (toAnalyze instanceof SQLTransientException) {
+            relationship = REL_RETRY;
+            flowFile = session.penalize(flowFile);
+        } else {
+            relationship = REL_FAILURE;
+        }
 
-            final boolean rollbackOnFailure = 
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
-            if (rollbackOnFailure) {
-                session.rollback();
-            } else {
-                flowFile = session.putAttribute(flowFile, 
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": 
e.getMessage()));
-                session.transfer(flowFile, relationship);
+        getLogger().error("Failed to put Records to database for {}. Routing 
to {}.", flowFile, relationship, e);
+
+        final boolean rollbackOnFailure = 
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+        if (rollbackOnFailure) {
+            session.rollback();
+        } else {
+            flowFile = session.putAttribute(flowFile, 
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": 
e.getMessage()));
+            session.transfer(flowFile, relationship);
+        }
+
+        rollbackConnection(connection);
+    }
+
+    private void rollbackConnection(Connection connection) {
+        if (connection != null) {
+            try {
+                if (!connection.getAutoCommit()) {
+                    connection.rollback();
+                }
+            } catch (final Exception rollbackException) {
+                getLogger().error("Failed to rollback JDBC transaction", 
rollbackException);
             }
+        }
+    }
 
-            connectionHolder.ifPresent(connection -> {
-                try {
-                    if (!connection.getAutoCommit()) {
-                        connection.rollback();
-                    }
-                } catch (final Exception rollbackException) {
-                    getLogger().error("Failed to rollback JDBC transaction", 
rollbackException);
+    private void closeConnection(Connection connection, boolean 
originalAutoCommit) {
+        if (connection != null) {
+            try {
+                if (originalAutoCommit != connection.getAutoCommit()) {
+                    connection.setAutoCommit(originalAutoCommit);
                 }
-            });
-        } finally {
-            if (originalAutoCommit) {
-                connectionHolder.ifPresent(connection -> {
-                    try {
-                        connection.setAutoCommit(true);
-                    } catch (final Exception autoCommitException) {
-                        getLogger().warn("Failed to set auto-commit back to 
true on connection", autoCommitException);
-                    }
-                });
+            } catch (final Exception autoCommitException) {
+                getLogger().warn(String.format("Failed to set auto-commit back 
to %s on connection", originalAutoCommit), autoCommitException);
             }
 
-            connectionHolder.ifPresent(connection -> {
-                try {
+            try {
+                if (!connection.isClosed()) {
                     connection.close();
-                } catch (final Exception closeException) {
-                    getLogger().warn("Failed to close database connection", 
closeException);
                 }
-            });
+            } catch (final Exception closeException) {
+                getLogger().warn("Failed to close database connection", 
closeException);
+            }
         }
     }
 
-
-    private void executeSQL(final ProcessContext context, final FlowFile 
flowFile, final Connection connection, final RecordReader recordReader)
+    private void executeSQL(final ProcessContext context, final ProcessSession 
session, final FlowFile flowFile,
+                            final Connection connection, final RecordReader 
recordReader)
             throws IllegalArgumentException, MalformedRecordException, 
IOException, SQLException {
 
         final RecordSchema recordSchema = recordReader.getSchema();
@@ -602,23 +666,66 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 }
             }
 
-            Record currentRecord;
-            while ((currentRecord = recordReader.nextRecord()) != null) {
+            final ComponentLog log = getLogger();
+            final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
+            // Do not use batch if set to batch size of 1 because that is 
similar to not using batching.
+            // Also do not use batches if the connection does not support 
batching.
+            boolean useBatch = maxBatchSize != 1 && 
isSupportBatchUpdates(connection);
+            int currentBatchSize = 0;
+            int batchIndex = 0;
+
+            boolean isFirstRecord = true;
+            Record nextRecord = recordReader.nextRecord();
+            while (nextRecord != null) {
+                Record currentRecord = nextRecord;
                 final String sql = currentRecord.getAsString(sqlField);
+                nextRecord = recordReader.nextRecord();
+
                 if (sql == null || StringUtils.isEmpty(sql)) {
                     throw new MalformedRecordException(format("Record had no 
(or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, 
flowFile));
                 }
 
-                // Execute the statement(s) as-is
+                final String[] sqlStatements;
                 if 
(context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) {
                     final String regex = "(?<!\\\\);";
-                    final String[] sqlStatements = (sql).split(regex);
-                    for (String sqlStatement : sqlStatements) {
+                    sqlStatements = (sql).split(regex);
+                } else {
+                    sqlStatements = new String[] { sql };
+                }
+
+                if (isFirstRecord) {
+                    // If there is only one sql statement to process, then do 
not use batching.
+                    if (nextRecord == null && sqlStatements.length == 1) {
+                        useBatch = false;
+                    }
+                    isFirstRecord = false;
+                }
+
+                for (String sqlStatement : sqlStatements) {
+                    if (useBatch) {
+                        currentBatchSize++;
+                        statement.addBatch(sqlStatement);
+                    } else {
                         statement.execute(sqlStatement);
                     }
-                } else {
-                    statement.execute(sql);
                 }
+
+                if (useBatch && maxBatchSize > 0 && currentBatchSize >= 
maxBatchSize) {
+                    batchIndex++;
+                    log.debug("Executing batch with last query {} because 
batch reached max size %s for {}; batch index: {}; batch size: {}",
+                            sql, maxBatchSize, flowFile, batchIndex, 
currentBatchSize);
+                    statement.executeBatch();
+                    session.adjustCounter("Batches Executed", 1, false);
+                    currentBatchSize = 0;
+                }
+            }
+
+            if (useBatch && currentBatchSize > 0) {
+                batchIndex++;
+                log.debug("Executing last batch because last statement reached 
for {}; batch index: {}; batch size: {}",
+                        flowFile, batchIndex, currentBatchSize);
+                statement.executeBatch();
+                session.adjustCounter("Batches Executed", 1, false);
             }
         }
     }
@@ -730,7 +837,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     final List<Integer> fieldIndexes = 
preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes();
                     final String sql = 
preparedSqlAndColumns.getSqlAndIncludedColumns().getSql();
 
-                    if (currentBatchSize > 0 && ps != lastPreparedStatement && 
lastPreparedStatement != null) {
+                    if (ps != lastPreparedStatement && lastPreparedStatement 
!= null) {
                         batchIndex++;
                         log.debug("Executing query {} because Statement Type 
changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: 
{}",
                             sql, flowFile, fieldIndexes, batchIndex, 
currentBatchSize);
@@ -1032,7 +1139,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
             final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger());
 
             if (SQL_TYPE.equalsIgnoreCase(statementType)) {
-                executeSQL(context, flowFile, connection, recordReader);
+                executeSQL(context, session, flowFile, connection, 
recordReader);
             } else {
                 final DMLSettings settings = new DMLSettings(context);
                 executeDML(context, session, flowFile, connection, 
recordReader, statementType, settings);
@@ -1493,6 +1600,28 @@ public class PutDatabaseRecord extends AbstractProcessor 
{
         return normalizedKeyColumnNames;
     }
 
+    private Optional<Boolean> supportsBatchUpdates = Optional.empty();
+
+    private void initializeSupportBatchUpdates(Connection connection) {
+        if (!supportsBatchUpdates.isPresent()) {
+            try {
+                final DatabaseMetaData dmd = connection.getMetaData();
+                supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates());
+                getLogger().debug(String.format("Connection 
supportsBatchUpdates is %s",
+                        supportsBatchUpdates.orElse(Boolean.FALSE)));
+            } catch (Exception ex) {
+                supportsBatchUpdates = Optional.of(Boolean.FALSE);
+                getLogger().debug(String.format("Exception while testing if 
connection supportsBatchUpdates due to %s - %s",
+                        ex.getClass().getName(), ex.getMessage()));
+            }
+        }
+    }
+
+    private boolean isSupportBatchUpdates(Connection connection) {
+        initializeSupportBatchUpdates(connection);
+        return supportsBatchUpdates.orElse(Boolean.FALSE);
+    }
+
     static class SchemaKey {
         private final String catalog;
         private final String schemaName;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 70a908b377..d9ef31733f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -37,9 +37,11 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentMatchers;
 import org.mockito.stubbing.Answer;
 
@@ -68,6 +70,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -86,7 +89,51 @@ import static org.mockito.Mockito.when;
 
 public class PutDatabaseRecordTest {
 
-    private static String DBCP_SERVICE_ID = "dbcp";
+    private enum TestCaseEnum {
+        // ENABLED means to use that test case in the parameterized tests.
+        // DISABLED test cases are used for single-run tests which are not 
parameterized
+        DEFAULT_0(ENABLED, new TestCase(false, false, 0)),
+        DEFAULT_1(DISABLED, new TestCase(false, false, 1)),
+        DEFAULT_2(DISABLED, new TestCase(false, false, 2)),
+        DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)),
+
+        ROLLBACK_0(DISABLED, new TestCase(false, true, 0)),
+        ROLLBACK_1(DISABLED, new TestCase(false, true, 1)),
+        ROLLBACK_2(DISABLED, new TestCase(false, true, 2)),
+        ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)),
+
+        // If autoCommit equals true, then rollbackOnFailure must be false AND 
batchSize must equal 0
+        AUTO_COMMIT_0(ENABLED, new TestCase(true, false, 0));
+
+        private final boolean enabled;
+        private final TestCase testCase;
+
+        TestCaseEnum(boolean enabled, TestCase t) {
+            this.enabled = enabled;
+            this.testCase = t;
+        }
+
+        public boolean isEnabled() {
+            return enabled;
+        }
+
+        public TestCase getTestCase() {
+            return testCase;
+        }
+
+    }
+
+    static Stream<Arguments> getTestCases() {
+        return Arrays.stream(TestCaseEnum.values())
+                .filter(TestCaseEnum::isEnabled)
+                .map(TestCaseEnum::getTestCase)
+                .map(Arguments::of);
+    }
+
+    private final static boolean ENABLED = true;
+    private final static boolean DISABLED = false;
+
+    private final static String DBCP_SERVICE_ID = "dbcp";
 
     private static final String CONNECTION_FAILED = "Connection Failed";
 
@@ -141,8 +188,7 @@ public class PutDatabaseRecordTest {
         System.clearProperty("derby.stream.error.file");
     }
 
-    @BeforeEach
-    public void setRunner() throws Exception {
+    private void setRunner(TestCase testCase) throws InitializationException {
         processor = new PutDatabaseRecord();
         //Mock the DBCP Controller Service so we can control the Results
         dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
@@ -153,10 +199,15 @@ public class PutDatabaseRecordTest {
         runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
         runner.enableControllerService(dbcp);
         runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
+        runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, 
testCase.getAutoCommitAsString());
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 
testCase.getRollbackOnFailureAsString());
+        runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, 
testCase.getBatchSizeAsString());
     }
 
     @Test
     public void testGetConnectionFailure() throws InitializationException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService(PARSER_ID, parser);
         runner.enableControllerService(parser);
@@ -175,6 +226,8 @@ public class PutDatabaseRecordTest {
 
     @Test
     public void testSetAutoCommitFalseFailure() throws 
InitializationException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
+
         dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION);
         final Map<String, String> dbcpProperties = new HashMap<>();
         runner = TestRunners.newTestRunner(processor);
@@ -211,6 +264,8 @@ public class PutDatabaseRecordTest {
 
     @Test
     public void testInsertNonRequiredColumnsUnmatchedField() throws 
InitializationException, ProcessException {
+        setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
+
         // Need to override the @Before method with a new processor that 
behaves badly
         processor = new PutDatabaseRecordUnmatchedField();
         //Mock the DBCP Controller Service so we can control the Results
@@ -257,7 +312,8 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testGeneratePreparedStatements() throws SQLException, 
MalformedRecordException {
+    public void testGeneratePreparedStatements() throws 
InitializationException, SQLException, MalformedRecordException {
+        setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
 
         final List<RecordField> fields = Arrays.asList(new RecordField("id", 
RecordFieldType.INT.getDataType()),
                 new RecordField("name", RecordFieldType.STRING.getDataType()),
@@ -295,7 +351,8 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testGeneratePreparedStatementsFailUnmatchedField() {
+    public void testGeneratePreparedStatementsFailUnmatchedField() throws 
InitializationException {
+        setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
 
         final List<RecordField> fields = Arrays.asList(new RecordField("id", 
RecordFieldType.INT.getDataType()),
                 new RecordField("name", RecordFieldType.STRING.getDataType()),
@@ -340,8 +397,11 @@ public class PutDatabaseRecordTest {
         assertEquals("Cannot map field 'non_existing' to any column in the 
database\nColumns: id,name,code", e.getMessage());
     }
 
-    @Test
-    void testInsert() throws InitializationException, ProcessException, 
SQLException, IOException {
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testInsert(TestCase testCase) throws InitializationException, 
ProcessException, SQLException {
+        setRunner(testCase);
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -406,7 +466,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertNonRequiredColumns() throws InitializationException, 
ProcessException, SQLException {
+    public void testInsertNonRequiredColumns() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -471,7 +533,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertBatchUpdateException() throws InitializationException, 
ProcessException, SQLException {
+    public void testInsertBatchUpdateException() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -483,7 +547,8 @@ public class PutDatabaseRecordTest {
 
         parser.addRecord(1, "rec1", 101);
         parser.addRecord(2, "rec2", 102);
-        parser.addRecord(3, "rec3", 1000);   // This record violates the 
constraint on the "code" column so should result in FlowFile being routed to 
failure
+        // This record violates the constraint on the "code" column so should 
result in FlowFile routing to failure
+        parser.addRecord(3, "rec3", 1000);
         parser.addRecord(4, "rec4", 104);
 
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
@@ -505,7 +570,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertBatchUpdateExceptionRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException {
+    public void testInsertBatchUpdateExceptionRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.ROLLBACK_1000.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -523,7 +590,6 @@ public class PutDatabaseRecordTest {
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
         runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
-        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
 
         runner.enqueue(new byte[0]);
         runner.run();
@@ -539,7 +605,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertNoTableSpecified() throws InitializationException, 
ProcessException, SQLException {
+    public void testInsertNoTableSpecified() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -563,7 +631,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertNoTableExists() throws InitializationException, 
ProcessException, SQLException {
+    public void testInsertNoTableExists() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.AUTO_COMMIT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -587,10 +657,46 @@ public class PutDatabaseRecordTest {
         MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0);
         final String errorMessage = 
flowFile.getAttribute("putdatabaserecord.error");
         assertTrue(errorMessage.contains("PERSONS2"));
+        runner.enqueue();
     }
 
-    @Test
-    void testInsertViaSqlStatementType() throws InitializationException, 
ProcessException, SQLException {
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testInsertViaSqlTypeOneStatement(TestCase testCase) throws 
InitializationException, ProcessException, SQLException {
+        setRunner(testCase);
+
+        String[] sqlStatements = new String[] {
+                "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
+        };
+        testInsertViaSqlTypeStatements(sqlStatements, false);
+    }
+
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testInsertViaSqlTypeTwoStatementsSemicolon(TestCase testCase) 
throws InitializationException, ProcessException, SQLException {
+        setRunner(testCase);
+
+        String[] sqlStatements = new String[] {
+                "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
+                "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
+        };
+        testInsertViaSqlTypeStatements(sqlStatements, true);
+    }
+
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testInsertViaSqlTypeThreeStatements(TestCase testCase) throws 
InitializationException, ProcessException, SQLException {
+        setRunner(testCase);
+
+        String[] sqlStatements = new String[] {
+                "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
+                "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)",
+                "UPDATE PERSONS SET code = 101 WHERE id = 1"
+        };
+        testInsertViaSqlTypeStatements(sqlStatements, false);
+    }
+
+    void testInsertViaSqlTypeStatements(String[] sqlStatements, boolean 
allowMultipleStatements) throws InitializationException, ProcessException, 
SQLException {
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -598,13 +704,15 @@ public class PutDatabaseRecordTest {
 
         parser.addSchemaField("sql", RecordFieldType.STRING);
 
-        parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1, 
'rec1',101)");
-        parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (2, 
'rec2',102)");
+        for (String sqlStatement : sqlStatements) {
+            parser.addRecord(sqlStatement);
+        }
 
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.USE_ATTR_TYPE);
         runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
         runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
+        runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 
String.valueOf(allowMultipleStatements));
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
@@ -615,22 +723,48 @@ public class PutDatabaseRecordTest {
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
         final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
-        assertTrue(rs.next());
-        assertEquals(1, rs.getInt(1));
-        assertEquals("rec1", rs.getString(2));
-        assertEquals(101, rs.getInt(3));
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt(1));
-        assertEquals("rec2", rs.getString(2));
-        assertEquals(102, rs.getInt(3));
+        if (sqlStatements.length >= 1) {
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertEquals("rec1", rs.getString(2));
+            assertEquals(101, rs.getInt(3));
+        }
+        if (sqlStatements.length >= 2) {
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertEquals("rec2", rs.getString(2));
+            assertEquals(102, rs.getInt(3));
+        }
         assertFalse(rs.next());
 
         stmt.close();
         conn.close();
     }
 
-    @Test
-    void testMultipleInsertsViaSqlStatementType() throws 
InitializationException, ProcessException, SQLException {
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testMultipleInsertsForOneStatementViaSqlStatementType(TestCase 
testCase) throws InitializationException, ProcessException, SQLException {
+        setRunner(testCase);
+
+        String[] sqlStatements = new String[] {
+                "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
+        };
+        testMultipleStatementsViaSqlStatementType(sqlStatements);
+    }
+
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void 
testMultipleInsertsForTwoStatementsViaSqlStatementType(TestCase testCase) 
throws InitializationException, ProcessException, SQLException {
+        setRunner(testCase);
+
+        String[] sqlStatements = new String[] {
+                "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
+                "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
+        };
+        testMultipleStatementsViaSqlStatementType(sqlStatements);
+    }
+
+    void testMultipleStatementsViaSqlStatementType(String[] sqlStatements) 
throws InitializationException, ProcessException, SQLException {
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -638,7 +772,7 @@ public class PutDatabaseRecordTest {
 
         parser.addSchemaField("sql", RecordFieldType.STRING);
 
-        parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1, 
'rec1',101);INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)");
+        parser.addRecord(String.join(";", sqlStatements));
 
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.USE_ATTR_TYPE);
@@ -655,14 +789,18 @@ public class PutDatabaseRecordTest {
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
         final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
-        assertTrue(rs.next());
-        assertEquals(1, rs.getInt(1));
-        assertEquals("rec1", rs.getString(2));
-        assertEquals(101, rs.getInt(3));
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt(1));
-        assertEquals("rec2", rs.getString(2));
-        assertEquals(102, rs.getInt(3));
+        if (sqlStatements.length >= 1) {
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertEquals("rec1", rs.getString(2));
+            assertEquals(101, rs.getInt(3));
+        }
+        if (sqlStatements.length >= 2) {
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertEquals("rec2", rs.getString(2));
+            assertEquals(102, rs.getInt(3));
+        }
         assertFalse(rs.next());
 
         stmt.close();
@@ -670,7 +808,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testMultipleInsertsViaSqlStatementTypeBadSQL() throws 
InitializationException, ProcessException, SQLException {
+    public void testMultipleInsertsViaSqlStatementTypeBadSQL() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -679,8 +819,8 @@ public class PutDatabaseRecordTest {
         parser.addSchemaField("sql", RecordFieldType.STRING);
 
         parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1, 
'rec1',101);" +
-                        "INSERT INTO PERSONS (id, name, code) VALUES (2, 
'rec2',102);" +
-                        "INSERT INTO PERSONS2 (id, name, code) VALUES (2, 
'rec2',102);");
+                "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);" 
+
+                "INSERT INTO PERSONS2 (id, name, code) VALUES (2, 
'rec2',102);");
 
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.USE_ATTR_TYPE);
@@ -706,7 +846,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInvalidData() throws InitializationException, ProcessException, 
SQLException {
+    public void testInvalidData() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -733,15 +875,19 @@ public class PutDatabaseRecordTest {
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
         final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
-        // Transaction should be rolled back and table should remain empty.
-        assertFalse(rs.next());
-
-        stmt.close();
-        conn.close();
+        try {
+            // Transaction should be rolled back and table should remain empty.
+            assertFalse(rs.next());
+        } finally {
+            stmt.close();
+            conn.close();
+        }
     }
 
     @Test
-    void testIOExceptionOnReadData() throws InitializationException, 
ProcessException, SQLException {
+    public void testIOExceptionOnReadData() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -768,15 +914,58 @@ public class PutDatabaseRecordTest {
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
         final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
-        // Transaction should be rolled back and table should remain empty.
-        assertFalse(rs.next());
+        try {
+            // Transaction should be rolled back and table should remain empty.
+            assertFalse(rs.next());
+        } finally {
+            stmt.close();
+            conn.close();
+        }
+    }
 
-        stmt.close();
-        conn.close();
+    @Test
+    public void testIOExceptionOnReadDataAutoCommit() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.AUTO_COMMIT_0.getTestCase());
+
+        recreateTable(createPersons);
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("code", RecordFieldType.INT);
+
+        parser.addRecord(1, "rec1", 101);
+        parser.addRecord(2, "rec2", 102);
+        parser.addRecord(3, "rec3", 104);
+
+        parser.failAfter(1, MockRecordFailureType.IO_EXCEPTION);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE, 1);
+        final Connection conn = dbcp.getConnection();
+        final Statement stmt = conn.createStatement();
+        final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+        try {
+            // Transaction should be rolled back and table should remain empty.
+            assertFalse(rs.next());
+        } finally {
+            stmt.close();
+            conn.close();
+        }
     }
 
     @Test
-    void testSqlStatementTypeNoValue() throws InitializationException, 
ProcessException, SQLException {
+    public void testSqlStatementTypeNoValue() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -801,7 +990,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testSqlStatementTypeNoValueRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException {
+    public void testSqlStatementTypeNoValueRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -815,7 +1006,6 @@ public class PutDatabaseRecordTest {
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.USE_ATTR_TYPE);
         runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
         runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
-        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
@@ -827,8 +1017,11 @@ public class PutDatabaseRecordTest {
         runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0);
     }
 
-    @Test
-    void testUpdate() throws InitializationException, ProcessException, 
SQLException {
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testUpdate(TestCase testCase) throws InitializationException, 
ProcessException, SQLException {
+        setRunner(testCase);
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -873,7 +1066,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testUpdatePkNotFirst() throws InitializationException, 
ProcessException, SQLException {
+    public void testUpdatePkNotFirst() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable("CREATE TABLE PERSONS (name varchar(100), id integer 
primary key, code integer)");
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -917,8 +1112,11 @@ public class PutDatabaseRecordTest {
         conn.close();
     }
 
-    @Test
-    void testUpdateMultipleSchemas() throws InitializationException, 
ProcessException, SQLException {
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testUpdateMultipleSchemas(TestCase testCase) throws 
InitializationException, ProcessException, SQLException {
+        setRunner(testCase);
+
         // Manually create and drop the tables and schemas
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
@@ -986,8 +1184,11 @@ public class PutDatabaseRecordTest {
         conn.close();
     }
 
-    @Test
-    void testUpdateAfterInsert() throws InitializationException, 
ProcessException, SQLException {
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testUpdateAfterInsert(TestCase testCase) throws 
InitializationException, ProcessException, SQLException {
+        setRunner(testCase);
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1046,7 +1247,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testUpdateNoPrimaryKeys() throws InitializationException, 
ProcessException, SQLException {
+    public void testUpdateNoPrimaryKeys() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), 
code integer)");
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1067,7 +1270,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testUpdateSpecifyUpdateKeys() throws InitializationException, 
ProcessException, SQLException {
+    public void testUpdateSpecifyUpdateKeys() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), 
code integer)");
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1113,7 +1318,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testUpdateSpecifyUpdateKeysNotFirst() throws InitializationException, 
ProcessException, SQLException {
+    public void testUpdateSpecifyUpdateKeysNotFirst() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
+
         recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), 
code integer)");
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1159,7 +1366,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, 
ProcessException, SQLException {
+    public void testUpdateSpecifyQuotedUpdateKeys() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
+
         recreateTable("CREATE TABLE PERSONS (\"id\" integer, \"name\" 
varchar(100), \"code\" integer)");
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1205,8 +1414,11 @@ public class PutDatabaseRecordTest {
         conn.close();
     }
 
-    @Test
-    void testDelete() throws InitializationException, ProcessException, 
SQLException {
+    @ParameterizedTest()
+    @MethodSource("getTestCases")
+    public void testDelete(TestCase testCase) throws InitializationException, 
ProcessException, SQLException {
+        setRunner(testCase);
+
         recreateTable(createPersons);
         Connection conn = dbcp.getConnection();
         Statement stmt = conn.createStatement();
@@ -1250,7 +1462,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testDeleteWithNulls() throws InitializationException, 
ProcessException, SQLException {
+    public void testDeleteWithNulls() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
+
         recreateTable(createPersons);
         Connection conn = dbcp.getConnection();
         Statement stmt = conn.createStatement();
@@ -1294,7 +1508,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testRecordPathOptions() throws InitializationException, SQLException {
+    public void testRecordPathOptions() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100), 
code integer)");
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1347,7 +1563,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertWithMaxBatchSize() throws InitializationException, 
ProcessException, SQLException {
+    public void testInsertWithMaxBatchSize() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1380,7 +1598,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertWithDefaultMaxBatchSize() throws InitializationException, 
ProcessException, SQLException {
+    public void testInsertWithDefaultMaxBatchSize() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1397,6 +1617,7 @@ public class PutDatabaseRecordTest {
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
         runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+        runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, 
PutDatabaseRecord.MAX_BATCH_SIZE.getDefaultValue());
 
         Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy();
 
@@ -1412,7 +1633,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testGenerateTableName() throws Exception {
+    public void testGenerateTableName() throws InitializationException, 
ProcessException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         final List<RecordField> fields = Arrays.asList(new RecordField("id", 
RecordFieldType.INT.getDataType()),
                 new RecordField("name", RecordFieldType.STRING.getDataType()),
                 new RecordField("code", RecordFieldType.INT.getDataType()),
@@ -1446,7 +1669,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertMismatchedCompatibleDataTypes() throws 
InitializationException, ProcessException, SQLException {
+    public void testInsertMismatchedCompatibleDataTypes() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1514,7 +1739,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertMismatchedNotCompatibleDataTypes() throws 
InitializationException, ProcessException, SQLException {
+    public void testInsertMismatchedNotCompatibleDataTypes() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1547,7 +1774,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testLongVarchar() throws InitializationException, ProcessException, 
SQLException {
+    public void testLongVarchar() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         // Manually create and drop the tables and schemas
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
@@ -1590,7 +1819,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertWithDifferentColumnOrdering() throws 
InitializationException, ProcessException, SQLException {
+    public void testInsertWithDifferentColumnOrdering() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         // Manually create and drop the tables and schemas
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
@@ -1637,7 +1868,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertWithBlobClob() throws Exception {
+    public void testInsertWithBlobClob() throws InitializationException, 
ProcessException, SQLException, IOException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary 
key, name clob," +
                 "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code 
>= 0 AND code < 1000))";
 
@@ -1688,7 +1921,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertHexStringIntoBinary() throws Exception {
+    public void testInsertHexStringIntoBinary() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, 
PutDatabaseRecord.BINARY_STRING_FORMAT_HEXADECIMAL);
 
         String tableName = "HEX_STRING_TEST";
@@ -1729,7 +1964,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertBase64StringIntoBinary() throws Exception {
+    public void testInsertBase64StringIntoBinary() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, 
PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64);
 
         String tableName = "BASE64_STRING_TEST";
@@ -1770,7 +2007,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertWithBlobClobObjectArraySource() throws Exception {
+    public void testInsertWithBlobClobObjectArraySource() throws 
InitializationException, ProcessException, SQLException, IOException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary 
key, name clob," +
                 "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code 
>= 0 AND code < 1000))";
 
@@ -1821,7 +2060,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertWithBlobStringSource() throws Exception {
+    public void testInsertWithBlobStringSource() throws 
InitializationException, ProcessException, SQLException, IOException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary 
key, name clob," +
                 "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code 
>= 0 AND code < 1000))";
 
@@ -1866,7 +2107,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertWithBlobIntegerArraySource() throws Exception {
+    public void testInsertWithBlobIntegerArraySource() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary 
key, name clob," +
                 "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code 
>= 0 AND code < 1000))";
 
@@ -1895,7 +2138,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertEnum() throws InitializationException, ProcessException, 
SQLException, IOException {
+    public void testInsertEnum() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
         runner = TestRunners.newTestRunner(processor);
         runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>());
@@ -1940,7 +2185,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertUUIDColumn() throws InitializationException, 
ProcessException, SQLException {
+    public void testInsertUUIDColumn() throws InitializationException, 
ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         // Manually create and drop the tables and schemas
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
@@ -1980,7 +2227,9 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertLongVarBinaryColumn() throws InitializationException, 
ProcessException, SQLException {
+    public void testInsertLongVarBinaryColumn() throws 
InitializationException, ProcessException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
         // Manually create and drop the tables and schemas
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
@@ -2023,7 +2272,7 @@ public class PutDatabaseRecordTest {
 
     private void recreateTable() throws ProcessException {
         try (final Connection conn = dbcp.getConnection();
-            final Statement stmt = conn.createStatement()) {
+             final Statement stmt = conn.createStatement()) {
             stmt.execute("drop table PERSONS");
             stmt.execute(createPersons);
         } catch (SQLException ignore) {
@@ -2053,9 +2302,9 @@ public class PutDatabaseRecordTest {
         } catch (SQLException ignore) {
             // Do nothing, may not have existed
         }
-        stmt.execute(createSQL);
-        stmt.close();
-        conn.close();
+        try (conn; stmt) {
+            stmt.execute(createSQL);
+        }
     }
 
     private Map<String, Object> createValues(final int id, final String name, 
final int code) {
@@ -2109,4 +2358,33 @@ public class PutDatabaseRecordTest {
             }
         }
     }
+
+    public static class TestCase {
+        TestCase(Boolean autoCommit, Boolean rollbackOnFailure, Integer 
batchSize) {
+            this.autoCommit = autoCommit;
+            this.rollbackOnFailure = rollbackOnFailure;
+            this.batchSize = batchSize;
+        }
+        private Boolean autoCommit = null;
+        private Boolean rollbackOnFailure = null;
+        private Integer batchSize = null;
+
+        String getAutoCommitAsString() {
+            return autoCommit == null ? null : autoCommit.toString();
+        }
+
+        String getRollbackOnFailureAsString() {
+            return rollbackOnFailure == null ? null : 
rollbackOnFailure.toString();
+        }
+
+        String getBatchSizeAsString() {
+            return batchSize == null ? null : batchSize.toString();
+        }
+
+        public String toString() {
+            return "autoCommit=" + String.valueOf(autoCommit) +
+                    "; rollbackOnFailure=" + String.valueOf(rollbackOnFailure) 
+
+                    "; batchSize=" + String.valueOf(batchSize);
+        }
+    }
 }
\ No newline at end of file

Reply via email to