This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1514890371 NIFI-12993 Add auto commit feature and add batch processing
for the sql stmt type
1514890371 is described below
commit 15148903712761e52868b59060ba2235486f0523
Author: Jim Steinebrey <[email protected]>
AuthorDate: Tue Apr 2 14:51:34 2024 -0400
NIFI-12993 Add auto commit feature and add batch processing for the sql
stmt type
NIFI-12993 Removed underscore from a few local variables.
NIFI-12993 Refactored unit tests into a single java file
NIFI-12993 Changed Optional.isEmpty() ro !Optional.isPresent() so it can
work in Nifi 1.x Java 8
Signed-off-by: Matt Burgess <[email protected]>
This closes #8597
---
.../processors/standard/PutDatabaseRecord.java | 243 ++++++++---
.../processors/standard/PutDatabaseRecordTest.java | 458 +++++++++++++++++----
2 files changed, 554 insertions(+), 147 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 26b4281cd0..79b98d20e2 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -27,6 +27,7 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
@@ -376,6 +377,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
+ static final PropertyDescriptor AUTO_COMMIT = new
PropertyDescriptor.Builder()
+ .name("database-session-autocommit")
+ .displayName("Database Session AutoCommit")
+ .description("The autocommit mode to set on the database
connection being used. If set to false, the operation(s) will be explicitly
committed or rolled back "
+ + "(based on success or failure respectively). If set to
true, the driver/database automatically handles the commit/rollback.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(false)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
static final PropertyDescriptor DB_TYPE;
protected static final Map<String, DatabaseAdapter> dbAdapters;
@@ -431,6 +443,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
pds.add(TABLE_SCHEMA_CACHE_SIZE);
pds.add(MAX_BATCH_SIZE);
+ pds.add(AUTO_COMMIT);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -466,9 +479,41 @@ public class PutDatabaseRecord extends AbstractProcessor {
);
}
+ final boolean autoCommit =
validationContext.getProperty(AUTO_COMMIT).asBoolean();
+ final boolean rollbackOnFailure =
validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+ if (autoCommit && rollbackOnFailure) {
+ validationResults.add(new ValidationResult.Builder()
+
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
+ .explanation(format("'%s' cannot be set to 'true' when
'%s' is also set to 'true'. "
+ + "Transaction rollbacks for batch updates
cannot be supported when auto commit is set to 'true'",
+
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(),
AUTO_COMMIT.getDisplayName()))
+ .build());
+ }
+
+ if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
+ final String explanation = format("'%s' must be hard-coded to
zero when '%s' is set to 'true'."
+ + " Batch size equal to zero executes all
statements in a single transaction"
+ + " which allows automatic rollback to revert
all statements if an error occurs",
+ MAX_BATCH_SIZE.getDisplayName(),
AUTO_COMMIT.getDisplayName());
+
+ validationResults.add(new ValidationResult.Builder()
+ .subject(MAX_BATCH_SIZE.getDisplayName())
+ .explanation(explanation)
+ .build());
+ }
+
return validationResults;
}
+ private boolean isMaxBatchSizeHardcodedToZero(ValidationContext
validationContext) {
+ try {
+ return
!validationContext.getProperty(MAX_BATCH_SIZE).isExpressionLanguagePresent()
+ && 0 ==
validationContext.getProperty(MAX_BATCH_SIZE).asInteger();
+ } catch (Exception ex) {
+ return false;
+ }
+ }
+
@OnScheduled
public void onScheduled(final ProcessContext context) {
databaseAdapter =
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
@@ -490,6 +535,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
dataRecordPath = dataRecordPathValue == null ? null :
RecordPath.compile(dataRecordPathValue);
}
+ @OnUnscheduled
+ public final void onUnscheduled() {
+ supportsBatchUpdates = Optional.empty();
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
FlowFile flowFile = session.get();
@@ -498,84 +548,98 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- Optional<Connection> connectionHolder = Optional.empty();
+ Connection connection = null;
boolean originalAutoCommit = false;
try {
- final Connection connection =
dbcpService.getConnection(flowFile.getAttributes());
- connectionHolder = Optional.of(connection);
+ connection = dbcpService.getConnection(flowFile.getAttributes());
originalAutoCommit = connection.getAutoCommit();
- if (originalAutoCommit) {
+ final boolean autoCommit =
context.getProperty(AUTO_COMMIT).asBoolean();
+ if (originalAutoCommit != autoCommit) {
try {
- connection.setAutoCommit(false);
+ connection.setAutoCommit(autoCommit);
} catch (SQLFeatureNotSupportedException sfnse) {
- getLogger().debug("setAutoCommit(false) not supported by
this driver");
+ getLogger().debug(String.format("setAutoCommit(%s) not
supported by this driver", autoCommit), sfnse);
}
}
putToDatabase(context, session, flowFile, connection);
+
// Only commit the connection if auto-commit is false
- if (!originalAutoCommit) {
+ if (!connection.getAutoCommit()) {
connection.commit();
}
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile,
getJdbcUrl(connection));
} catch (final Exception e) {
- // When an Exception is thrown, we want to route to 'retry' if we
expect that attempting the same request again
- // might work. Otherwise, route to failure. SQLTransientException
is a specific type that indicates that a retry may work.
- final Relationship relationship;
- final Throwable toAnalyze = (e instanceof BatchUpdateException) ?
e.getCause() : e;
- if (toAnalyze instanceof SQLTransientException) {
- relationship = REL_RETRY;
- flowFile = session.penalize(flowFile);
- } else {
- relationship = REL_FAILURE;
- }
+ routeOnException(context, session, connection, e, flowFile);
+ } finally {
+ closeConnection(connection, originalAutoCommit);
+ }
+ }
- getLogger().error("Failed to put Records to database for {}.
Routing to {}.", flowFile, relationship, e);
+ private void routeOnException(final ProcessContext context, final
ProcessSession session,
+ Connection connection, Exception e, FlowFile
flowFile) {
+ // When an Exception is thrown, we want to route to 'retry' if we
expect that attempting the same request again
+ // might work. Otherwise, route to failure. SQLTransientException is a
specific type that indicates that a retry may work.
+ final Relationship relationship;
+ final Throwable toAnalyze = (e instanceof BatchUpdateException) ?
e.getCause() : e;
+ if (toAnalyze instanceof SQLTransientException) {
+ relationship = REL_RETRY;
+ flowFile = session.penalize(flowFile);
+ } else {
+ relationship = REL_FAILURE;
+ }
- final boolean rollbackOnFailure =
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
- if (rollbackOnFailure) {
- session.rollback();
- } else {
- flowFile = session.putAttribute(flowFile,
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown":
e.getMessage()));
- session.transfer(flowFile, relationship);
+ getLogger().error("Failed to put Records to database for {}. Routing
to {}.", flowFile, relationship, e);
+
+ final boolean rollbackOnFailure =
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+ if (rollbackOnFailure) {
+ session.rollback();
+ } else {
+ flowFile = session.putAttribute(flowFile,
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown":
e.getMessage()));
+ session.transfer(flowFile, relationship);
+ }
+
+ rollbackConnection(connection);
+ }
+
+ private void rollbackConnection(Connection connection) {
+ if (connection != null) {
+ try {
+ if (!connection.getAutoCommit()) {
+ connection.rollback();
+ }
+ } catch (final Exception rollbackException) {
+ getLogger().error("Failed to rollback JDBC transaction",
rollbackException);
}
+ }
+ }
- connectionHolder.ifPresent(connection -> {
- try {
- if (!connection.getAutoCommit()) {
- connection.rollback();
- }
- } catch (final Exception rollbackException) {
- getLogger().error("Failed to rollback JDBC transaction",
rollbackException);
+ private void closeConnection(Connection connection, boolean
originalAutoCommit) {
+ if (connection != null) {
+ try {
+ if (originalAutoCommit != connection.getAutoCommit()) {
+ connection.setAutoCommit(originalAutoCommit);
}
- });
- } finally {
- if (originalAutoCommit) {
- connectionHolder.ifPresent(connection -> {
- try {
- connection.setAutoCommit(true);
- } catch (final Exception autoCommitException) {
- getLogger().warn("Failed to set auto-commit back to
true on connection", autoCommitException);
- }
- });
+ } catch (final Exception autoCommitException) {
+ getLogger().warn(String.format("Failed to set auto-commit back
to %s on connection", originalAutoCommit), autoCommitException);
}
- connectionHolder.ifPresent(connection -> {
- try {
+ try {
+ if (!connection.isClosed()) {
connection.close();
- } catch (final Exception closeException) {
- getLogger().warn("Failed to close database connection",
closeException);
}
- });
+ } catch (final Exception closeException) {
+ getLogger().warn("Failed to close database connection",
closeException);
+ }
}
}
-
- private void executeSQL(final ProcessContext context, final FlowFile
flowFile, final Connection connection, final RecordReader recordReader)
+ private void executeSQL(final ProcessContext context, final ProcessSession
session, final FlowFile flowFile,
+ final Connection connection, final RecordReader
recordReader)
throws IllegalArgumentException, MalformedRecordException,
IOException, SQLException {
final RecordSchema recordSchema = recordReader.getSchema();
@@ -602,23 +666,66 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
}
- Record currentRecord;
- while ((currentRecord = recordReader.nextRecord()) != null) {
+ final ComponentLog log = getLogger();
+ final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
+ // Do not use batch if set to batch size of 1 because that is
similar to not using batching.
+ // Also do not use batches if the connection does not support
batching.
+ boolean useBatch = maxBatchSize != 1 &&
isSupportBatchUpdates(connection);
+ int currentBatchSize = 0;
+ int batchIndex = 0;
+
+ boolean isFirstRecord = true;
+ Record nextRecord = recordReader.nextRecord();
+ while (nextRecord != null) {
+ Record currentRecord = nextRecord;
final String sql = currentRecord.getAsString(sqlField);
+ nextRecord = recordReader.nextRecord();
+
if (sql == null || StringUtils.isEmpty(sql)) {
throw new MalformedRecordException(format("Record had no
(or null) value for Field Containing SQL: %s, FlowFile %s", sqlField,
flowFile));
}
- // Execute the statement(s) as-is
+ final String[] sqlStatements;
if
(context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) {
final String regex = "(?<!\\\\);";
- final String[] sqlStatements = (sql).split(regex);
- for (String sqlStatement : sqlStatements) {
+ sqlStatements = (sql).split(regex);
+ } else {
+ sqlStatements = new String[] { sql };
+ }
+
+ if (isFirstRecord) {
+ // If there is only one sql statement to process, then do
not use batching.
+ if (nextRecord == null && sqlStatements.length == 1) {
+ useBatch = false;
+ }
+ isFirstRecord = false;
+ }
+
+ for (String sqlStatement : sqlStatements) {
+ if (useBatch) {
+ currentBatchSize++;
+ statement.addBatch(sqlStatement);
+ } else {
statement.execute(sqlStatement);
}
- } else {
- statement.execute(sql);
}
+
+ if (useBatch && maxBatchSize > 0 && currentBatchSize >=
maxBatchSize) {
+ batchIndex++;
+ log.debug("Executing batch with last query {} because
batch reached max size %s for {}; batch index: {}; batch size: {}",
+ sql, maxBatchSize, flowFile, batchIndex,
currentBatchSize);
+ statement.executeBatch();
+ session.adjustCounter("Batches Executed", 1, false);
+ currentBatchSize = 0;
+ }
+ }
+
+ if (useBatch && currentBatchSize > 0) {
+ batchIndex++;
+ log.debug("Executing last batch because last statement reached
for {}; batch index: {}; batch size: {}",
+ flowFile, batchIndex, currentBatchSize);
+ statement.executeBatch();
+ session.adjustCounter("Batches Executed", 1, false);
}
}
}
@@ -730,7 +837,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final List<Integer> fieldIndexes =
preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes();
final String sql =
preparedSqlAndColumns.getSqlAndIncludedColumns().getSql();
- if (currentBatchSize > 0 && ps != lastPreparedStatement &&
lastPreparedStatement != null) {
+ if (ps != lastPreparedStatement && lastPreparedStatement
!= null) {
batchIndex++;
log.debug("Executing query {} because Statement Type
changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size:
{}",
sql, flowFile, fieldIndexes, batchIndex,
currentBatchSize);
@@ -1032,7 +1139,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger());
if (SQL_TYPE.equalsIgnoreCase(statementType)) {
- executeSQL(context, flowFile, connection, recordReader);
+ executeSQL(context, session, flowFile, connection,
recordReader);
} else {
final DMLSettings settings = new DMLSettings(context);
executeDML(context, session, flowFile, connection,
recordReader, statementType, settings);
@@ -1493,6 +1600,28 @@ public class PutDatabaseRecord extends AbstractProcessor
{
return normalizedKeyColumnNames;
}
+ private Optional<Boolean> supportsBatchUpdates = Optional.empty();
+
+ private void initializeSupportBatchUpdates(Connection connection) {
+ if (!supportsBatchUpdates.isPresent()) {
+ try {
+ final DatabaseMetaData dmd = connection.getMetaData();
+ supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates());
+ getLogger().debug(String.format("Connection
supportsBatchUpdates is %s",
+ supportsBatchUpdates.orElse(Boolean.FALSE)));
+ } catch (Exception ex) {
+ supportsBatchUpdates = Optional.of(Boolean.FALSE);
+ getLogger().debug(String.format("Exception while testing if
connection supportsBatchUpdates due to %s - %s",
+ ex.getClass().getName(), ex.getMessage()));
+ }
+ }
+ }
+
+ private boolean isSupportBatchUpdates(Connection connection) {
+ initializeSupportBatchUpdates(connection);
+ return supportsBatchUpdates.orElse(Boolean.FALSE);
+ }
+
static class SchemaKey {
private final String catalog;
private final String schemaName;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 70a908b377..d9ef31733f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -37,9 +37,11 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;
@@ -68,6 +70,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -86,7 +89,51 @@ import static org.mockito.Mockito.when;
public class PutDatabaseRecordTest {
- private static String DBCP_SERVICE_ID = "dbcp";
+ private enum TestCaseEnum {
+ // ENABLED means to use that test case in the parameterized tests.
+ // DISABLED test cases are used for single-run tests which are not
parameterized
+ DEFAULT_0(ENABLED, new TestCase(false, false, 0)),
+ DEFAULT_1(DISABLED, new TestCase(false, false, 1)),
+ DEFAULT_2(DISABLED, new TestCase(false, false, 2)),
+ DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)),
+
+ ROLLBACK_0(DISABLED, new TestCase(false, true, 0)),
+ ROLLBACK_1(DISABLED, new TestCase(false, true, 1)),
+ ROLLBACK_2(DISABLED, new TestCase(false, true, 2)),
+ ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)),
+
+ // If autoCommit equals true, then rollbackOnFailure must be false AND
batchSize must equal 0
+ AUTO_COMMIT_0(ENABLED, new TestCase(true, false, 0));
+
+ private final boolean enabled;
+ private final TestCase testCase;
+
+ TestCaseEnum(boolean enabled, TestCase t) {
+ this.enabled = enabled;
+ this.testCase = t;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public TestCase getTestCase() {
+ return testCase;
+ }
+
+ }
+
+ static Stream<Arguments> getTestCases() {
+ return Arrays.stream(TestCaseEnum.values())
+ .filter(TestCaseEnum::isEnabled)
+ .map(TestCaseEnum::getTestCase)
+ .map(Arguments::of);
+ }
+
+ private final static boolean ENABLED = true;
+ private final static boolean DISABLED = false;
+
+ private final static String DBCP_SERVICE_ID = "dbcp";
private static final String CONNECTION_FAILED = "Connection Failed";
@@ -141,8 +188,7 @@ public class PutDatabaseRecordTest {
System.clearProperty("derby.stream.error.file");
}
- @BeforeEach
- public void setRunner() throws Exception {
+ private void setRunner(TestCase testCase) throws InitializationException {
processor = new PutDatabaseRecord();
//Mock the DBCP Controller Service so we can control the Results
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
@@ -153,10 +199,15 @@ public class PutDatabaseRecordTest {
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
+ runner.setProperty(PutDatabaseRecord.AUTO_COMMIT,
testCase.getAutoCommitAsString());
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE,
testCase.getRollbackOnFailureAsString());
+ runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE,
testCase.getBatchSizeAsString());
}
@Test
public void testGetConnectionFailure() throws InitializationException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService(PARSER_ID, parser);
runner.enableControllerService(parser);
@@ -175,6 +226,8 @@ public class PutDatabaseRecordTest {
@Test
public void testSetAutoCommitFalseFailure() throws
InitializationException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
+
dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION);
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
@@ -211,6 +264,8 @@ public class PutDatabaseRecordTest {
@Test
public void testInsertNonRequiredColumnsUnmatchedField() throws
InitializationException, ProcessException {
+ setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
+
// Need to override the @Before method with a new processor that
behaves badly
processor = new PutDatabaseRecordUnmatchedField();
//Mock the DBCP Controller Service so we can control the Results
@@ -257,7 +312,8 @@ public class PutDatabaseRecordTest {
}
@Test
- void testGeneratePreparedStatements() throws SQLException,
MalformedRecordException {
+ public void testGeneratePreparedStatements() throws
InitializationException, SQLException, MalformedRecordException {
+ setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
final List<RecordField> fields = Arrays.asList(new RecordField("id",
RecordFieldType.INT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()),
@@ -295,7 +351,8 @@ public class PutDatabaseRecordTest {
}
@Test
- void testGeneratePreparedStatementsFailUnmatchedField() {
+ public void testGeneratePreparedStatementsFailUnmatchedField() throws
InitializationException {
+ setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
final List<RecordField> fields = Arrays.asList(new RecordField("id",
RecordFieldType.INT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()),
@@ -340,8 +397,11 @@ public class PutDatabaseRecordTest {
assertEquals("Cannot map field 'non_existing' to any column in the
database\nColumns: id,name,code", e.getMessage());
}
- @Test
- void testInsert() throws InitializationException, ProcessException,
SQLException, IOException {
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testInsert(TestCase testCase) throws InitializationException,
ProcessException, SQLException {
+ setRunner(testCase);
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -406,7 +466,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertNonRequiredColumns() throws InitializationException,
ProcessException, SQLException {
+ public void testInsertNonRequiredColumns() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -471,7 +533,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertBatchUpdateException() throws InitializationException,
ProcessException, SQLException {
+ public void testInsertBatchUpdateException() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -483,7 +547,8 @@ public class PutDatabaseRecordTest {
parser.addRecord(1, "rec1", 101);
parser.addRecord(2, "rec2", 102);
- parser.addRecord(3, "rec3", 1000); // This record violates the
constraint on the "code" column so should result in FlowFile being routed to
failure
+ // This record violates the constraint on the "code" column so should
result in FlowFile routing to failure
+ parser.addRecord(3, "rec3", 1000);
parser.addRecord(4, "rec4", 104);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
@@ -505,7 +570,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertBatchUpdateExceptionRollbackOnFailure() throws
InitializationException, ProcessException, SQLException {
+ public void testInsertBatchUpdateExceptionRollbackOnFailure() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.ROLLBACK_1000.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -523,7 +590,6 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
- runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
runner.enqueue(new byte[0]);
runner.run();
@@ -539,7 +605,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertNoTableSpecified() throws InitializationException,
ProcessException, SQLException {
+ public void testInsertNoTableSpecified() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -563,7 +631,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertNoTableExists() throws InitializationException,
ProcessException, SQLException {
+ public void testInsertNoTableExists() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.AUTO_COMMIT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -587,10 +657,46 @@ public class PutDatabaseRecordTest {
MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0);
final String errorMessage =
flowFile.getAttribute("putdatabaserecord.error");
assertTrue(errorMessage.contains("PERSONS2"));
+ runner.enqueue();
}
- @Test
- void testInsertViaSqlStatementType() throws InitializationException,
ProcessException, SQLException {
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testInsertViaSqlTypeOneStatement(TestCase testCase) throws
InitializationException, ProcessException, SQLException {
+ setRunner(testCase);
+
+ String[] sqlStatements = new String[] {
+ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
+ };
+ testInsertViaSqlTypeStatements(sqlStatements, false);
+ }
+
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testInsertViaSqlTypeTwoStatementsSemicolon(TestCase testCase)
throws InitializationException, ProcessException, SQLException {
+ setRunner(testCase);
+
+ String[] sqlStatements = new String[] {
+ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
+ "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
+ };
+ testInsertViaSqlTypeStatements(sqlStatements, true);
+ }
+
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testInsertViaSqlTypeThreeStatements(TestCase testCase) throws
InitializationException, ProcessException, SQLException {
+ setRunner(testCase);
+
+ String[] sqlStatements = new String[] {
+ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
+ "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)",
+ "UPDATE PERSONS SET code = 101 WHERE id = 1"
+ };
+ testInsertViaSqlTypeStatements(sqlStatements, false);
+ }
+
+ void testInsertViaSqlTypeStatements(String[] sqlStatements, boolean
allowMultipleStatements) throws InitializationException, ProcessException,
SQLException {
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -598,13 +704,15 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("sql", RecordFieldType.STRING);
- parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1,
'rec1',101)");
- parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (2,
'rec2',102)");
+ for (String sqlStatement : sqlStatements) {
+ parser.addRecord(sqlStatement);
+ }
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.USE_ATTR_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
+ runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS,
String.valueOf(allowMultipleStatements));
final Map<String, String> attrs = new HashMap<>();
attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
@@ -615,22 +723,48 @@ public class PutDatabaseRecordTest {
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("rec1", rs.getString(2));
- assertEquals(101, rs.getInt(3));
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertEquals("rec2", rs.getString(2));
- assertEquals(102, rs.getInt(3));
+ if (sqlStatements.length >= 1) {
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("rec1", rs.getString(2));
+ assertEquals(101, rs.getInt(3));
+ }
+ if (sqlStatements.length >= 2) {
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("rec2", rs.getString(2));
+ assertEquals(102, rs.getInt(3));
+ }
assertFalse(rs.next());
stmt.close();
conn.close();
}
- @Test
- void testMultipleInsertsViaSqlStatementType() throws
InitializationException, ProcessException, SQLException {
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testMultipleInsertsForOneStatementViaSqlStatementType(TestCase
testCase) throws InitializationException, ProcessException, SQLException {
+ setRunner(testCase);
+
+ String[] sqlStatements = new String[] {
+ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)"
+ };
+ testMultipleStatementsViaSqlStatementType(sqlStatements);
+ }
+
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void
testMultipleInsertsForTwoStatementsViaSqlStatementType(TestCase testCase)
throws InitializationException, ProcessException, SQLException {
+ setRunner(testCase);
+
+ String[] sqlStatements = new String[] {
+ "INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)",
+ "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
+ };
+ testMultipleStatementsViaSqlStatementType(sqlStatements);
+ }
+
+ void testMultipleStatementsViaSqlStatementType(String[] sqlStatements)
throws InitializationException, ProcessException, SQLException {
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -638,7 +772,7 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("sql", RecordFieldType.STRING);
- parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1,
'rec1',101);INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)");
+ parser.addRecord(String.join(";", sqlStatements));
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.USE_ATTR_TYPE);
@@ -655,14 +789,18 @@ public class PutDatabaseRecordTest {
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("rec1", rs.getString(2));
- assertEquals(101, rs.getInt(3));
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertEquals("rec2", rs.getString(2));
- assertEquals(102, rs.getInt(3));
+ if (sqlStatements.length >= 1) {
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("rec1", rs.getString(2));
+ assertEquals(101, rs.getInt(3));
+ }
+ if (sqlStatements.length >= 2) {
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("rec2", rs.getString(2));
+ assertEquals(102, rs.getInt(3));
+ }
assertFalse(rs.next());
stmt.close();
@@ -670,7 +808,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testMultipleInsertsViaSqlStatementTypeBadSQL() throws
InitializationException, ProcessException, SQLException {
+ public void testMultipleInsertsViaSqlStatementTypeBadSQL() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -679,8 +819,8 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("sql", RecordFieldType.STRING);
parser.addRecord("INSERT INTO PERSONS (id, name, code) VALUES (1,
'rec1',101);" +
- "INSERT INTO PERSONS (id, name, code) VALUES (2,
'rec2',102);" +
- "INSERT INTO PERSONS2 (id, name, code) VALUES (2,
'rec2',102);");
+ "INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);"
+
+ "INSERT INTO PERSONS2 (id, name, code) VALUES (2,
'rec2',102);");
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.USE_ATTR_TYPE);
@@ -706,7 +846,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInvalidData() throws InitializationException, ProcessException,
SQLException {
+ public void testInvalidData() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -733,15 +875,19 @@ public class PutDatabaseRecordTest {
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
- // Transaction should be rolled back and table should remain empty.
- assertFalse(rs.next());
-
- stmt.close();
- conn.close();
+ try {
+ // Transaction should be rolled back and table should remain empty.
+ assertFalse(rs.next());
+ } finally {
+ stmt.close();
+ conn.close();
+ }
}
@Test
- void testIOExceptionOnReadData() throws InitializationException,
ProcessException, SQLException {
+ public void testIOExceptionOnReadData() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -768,15 +914,58 @@ public class PutDatabaseRecordTest {
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
- // Transaction should be rolled back and table should remain empty.
- assertFalse(rs.next());
+ try {
+ // Transaction should be rolled back and table should remain empty.
+ assertFalse(rs.next());
+ } finally {
+ stmt.close();
+ conn.close();
+ }
+ }
- stmt.close();
- conn.close();
+ @Test
+ public void testIOExceptionOnReadDataAutoCommit() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.AUTO_COMMIT_0.getTestCase());
+
+ recreateTable(createPersons);
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("code", RecordFieldType.INT);
+
+ parser.addRecord(1, "rec1", 101);
+ parser.addRecord(2, "rec2", 102);
+ parser.addRecord(3, "rec3", 104);
+
+ parser.failAfter(1, MockRecordFailureType.IO_EXCEPTION);
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE, 1);
+ final Connection conn = dbcp.getConnection();
+ final Statement stmt = conn.createStatement();
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ try {
+ // Transaction should be rolled back and table should remain empty.
+ assertFalse(rs.next());
+ } finally {
+ stmt.close();
+ conn.close();
+ }
}
@Test
- void testSqlStatementTypeNoValue() throws InitializationException,
ProcessException, SQLException {
+ public void testSqlStatementTypeNoValue() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -801,7 +990,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testSqlStatementTypeNoValueRollbackOnFailure() throws
InitializationException, ProcessException, SQLException {
+ public void testSqlStatementTypeNoValueRollbackOnFailure() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.ROLLBACK_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -815,7 +1006,6 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.USE_ATTR_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
- runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final Map<String, String> attrs = new HashMap<>();
attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
@@ -827,8 +1017,11 @@ public class PutDatabaseRecordTest {
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0);
}
- @Test
- void testUpdate() throws InitializationException, ProcessException,
SQLException {
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testUpdate(TestCase testCase) throws InitializationException,
ProcessException, SQLException {
+ setRunner(testCase);
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -873,7 +1066,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testUpdatePkNotFirst() throws InitializationException,
ProcessException, SQLException {
+ public void testUpdatePkNotFirst() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable("CREATE TABLE PERSONS (name varchar(100), id integer
primary key, code integer)");
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -917,8 +1112,11 @@ public class PutDatabaseRecordTest {
conn.close();
}
- @Test
- void testUpdateMultipleSchemas() throws InitializationException,
ProcessException, SQLException {
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testUpdateMultipleSchemas(TestCase testCase) throws
InitializationException, ProcessException, SQLException {
+ setRunner(testCase);
+
// Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
@@ -986,8 +1184,11 @@ public class PutDatabaseRecordTest {
conn.close();
}
- @Test
- void testUpdateAfterInsert() throws InitializationException,
ProcessException, SQLException {
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testUpdateAfterInsert(TestCase testCase) throws
InitializationException, ProcessException, SQLException {
+ setRunner(testCase);
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1046,7 +1247,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testUpdateNoPrimaryKeys() throws InitializationException,
ProcessException, SQLException {
+ public void testUpdateNoPrimaryKeys() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100),
code integer)");
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1067,7 +1270,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testUpdateSpecifyUpdateKeys() throws InitializationException,
ProcessException, SQLException {
+ public void testUpdateSpecifyUpdateKeys() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100),
code integer)");
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1113,7 +1318,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testUpdateSpecifyUpdateKeysNotFirst() throws InitializationException,
ProcessException, SQLException {
+ public void testUpdateSpecifyUpdateKeysNotFirst() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
+
recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100),
code integer)");
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1159,7 +1366,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException,
ProcessException, SQLException {
+ public void testUpdateSpecifyQuotedUpdateKeys() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
+
recreateTable("CREATE TABLE PERSONS (\"id\" integer, \"name\"
varchar(100), \"code\" integer)");
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1205,8 +1414,11 @@ public class PutDatabaseRecordTest {
conn.close();
}
- @Test
- void testDelete() throws InitializationException, ProcessException,
SQLException {
+ @ParameterizedTest()
+ @MethodSource("getTestCases")
+ public void testDelete(TestCase testCase) throws InitializationException,
ProcessException, SQLException {
+ setRunner(testCase);
+
recreateTable(createPersons);
Connection conn = dbcp.getConnection();
Statement stmt = conn.createStatement();
@@ -1250,7 +1462,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testDeleteWithNulls() throws InitializationException,
ProcessException, SQLException {
+ public void testDeleteWithNulls() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
+
recreateTable(createPersons);
Connection conn = dbcp.getConnection();
Statement stmt = conn.createStatement();
@@ -1294,7 +1508,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testRecordPathOptions() throws InitializationException, SQLException {
+ public void testRecordPathOptions() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable("CREATE TABLE PERSONS (id integer, name varchar(100),
code integer)");
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1347,7 +1563,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertWithMaxBatchSize() throws InitializationException,
ProcessException, SQLException {
+ public void testInsertWithMaxBatchSize() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1380,7 +1598,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertWithDefaultMaxBatchSize() throws InitializationException,
ProcessException, SQLException {
+ public void testInsertWithDefaultMaxBatchSize() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1397,6 +1617,7 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+ runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE,
PutDatabaseRecord.MAX_BATCH_SIZE.getDefaultValue());
Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy();
@@ -1412,7 +1633,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testGenerateTableName() throws Exception {
+ public void testGenerateTableName() throws InitializationException,
ProcessException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
final List<RecordField> fields = Arrays.asList(new RecordField("id",
RecordFieldType.INT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("code", RecordFieldType.INT.getDataType()),
@@ -1446,7 +1669,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertMismatchedCompatibleDataTypes() throws
InitializationException, ProcessException, SQLException {
+ public void testInsertMismatchedCompatibleDataTypes() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1514,7 +1739,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertMismatchedNotCompatibleDataTypes() throws
InitializationException, ProcessException, SQLException {
+ public void testInsertMismatchedNotCompatibleDataTypes() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
@@ -1547,7 +1774,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testLongVarchar() throws InitializationException, ProcessException,
SQLException {
+ public void testLongVarchar() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
// Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
@@ -1590,7 +1819,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertWithDifferentColumnOrdering() throws
InitializationException, ProcessException, SQLException {
+ public void testInsertWithDifferentColumnOrdering() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
// Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
@@ -1637,7 +1868,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertWithBlobClob() throws Exception {
+ public void testInsertWithBlobClob() throws InitializationException,
ProcessException, SQLException, IOException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary
key, name clob," +
"content blob, code integer CONSTRAINT CODE_RANGE CHECK (code
>= 0 AND code < 1000))";
@@ -1688,7 +1921,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertHexStringIntoBinary() throws Exception {
+ public void testInsertHexStringIntoBinary() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT,
PutDatabaseRecord.BINARY_STRING_FORMAT_HEXADECIMAL);
String tableName = "HEX_STRING_TEST";
@@ -1729,7 +1964,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertBase64StringIntoBinary() throws Exception {
+ public void testInsertBase64StringIntoBinary() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT,
PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64);
String tableName = "BASE64_STRING_TEST";
@@ -1770,7 +2007,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertWithBlobClobObjectArraySource() throws Exception {
+ public void testInsertWithBlobClobObjectArraySource() throws
InitializationException, ProcessException, SQLException, IOException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary
key, name clob," +
"content blob, code integer CONSTRAINT CODE_RANGE CHECK (code
>= 0 AND code < 1000))";
@@ -1821,7 +2060,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertWithBlobStringSource() throws Exception {
+ public void testInsertWithBlobStringSource() throws
InitializationException, ProcessException, SQLException, IOException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary
key, name clob," +
"content blob, code integer CONSTRAINT CODE_RANGE CHECK (code
>= 0 AND code < 1000))";
@@ -1866,7 +2107,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertWithBlobIntegerArraySource() throws Exception {
+ public void testInsertWithBlobIntegerArraySource() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary
key, name clob," +
"content blob, code integer CONSTRAINT CODE_RANGE CHECK (code
>= 0 AND code < 1000))";
@@ -1895,7 +2138,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertEnum() throws InitializationException, ProcessException,
SQLException, IOException {
+ public void testInsertEnum() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
runner = TestRunners.newTestRunner(processor);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>());
@@ -1940,7 +2185,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertUUIDColumn() throws InitializationException,
ProcessException, SQLException {
+ public void testInsertUUIDColumn() throws InitializationException,
ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
// Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
@@ -1980,7 +2227,9 @@ public class PutDatabaseRecordTest {
}
@Test
- void testInsertLongVarBinaryColumn() throws InitializationException,
ProcessException, SQLException {
+ public void testInsertLongVarBinaryColumn() throws
InitializationException, ProcessException, SQLException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
// Manually create and drop the tables and schemas
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
@@ -2023,7 +2272,7 @@ public class PutDatabaseRecordTest {
private void recreateTable() throws ProcessException {
try (final Connection conn = dbcp.getConnection();
- final Statement stmt = conn.createStatement()) {
+ final Statement stmt = conn.createStatement()) {
stmt.execute("drop table PERSONS");
stmt.execute(createPersons);
} catch (SQLException ignore) {
@@ -2053,9 +2302,9 @@ public class PutDatabaseRecordTest {
} catch (SQLException ignore) {
// Do nothing, may not have existed
}
- stmt.execute(createSQL);
- stmt.close();
- conn.close();
+ try (conn; stmt) {
+ stmt.execute(createSQL);
+ }
}
private Map<String, Object> createValues(final int id, final String name,
final int code) {
@@ -2109,4 +2358,33 @@ public class PutDatabaseRecordTest {
}
}
}
+
+ public static class TestCase {
+ TestCase(Boolean autoCommit, Boolean rollbackOnFailure, Integer
batchSize) {
+ this.autoCommit = autoCommit;
+ this.rollbackOnFailure = rollbackOnFailure;
+ this.batchSize = batchSize;
+ }
+ private Boolean autoCommit = null;
+ private Boolean rollbackOnFailure = null;
+ private Integer batchSize = null;
+
+ String getAutoCommitAsString() {
+ return autoCommit == null ? null : autoCommit.toString();
+ }
+
+ String getRollbackOnFailureAsString() {
+ return rollbackOnFailure == null ? null :
rollbackOnFailure.toString();
+ }
+
+ String getBatchSizeAsString() {
+ return batchSize == null ? null : batchSize.toString();
+ }
+
+ public String toString() {
+ return "autoCommit=" + String.valueOf(autoCommit) +
+ "; rollbackOnFailure=" + String.valueOf(rollbackOnFailure)
+
+ "; batchSize=" + String.valueOf(batchSize);
+ }
+ }
}
\ No newline at end of file