This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 165a174c0a NIFI-13103: Make AutoCommit default to no value set in
PutDatabaseRecord
165a174c0a is described below
commit 165a174c0a4bdb889907d22804fd8b6e42ecdf9e
Author: Joe Gresock <[email protected]>
AuthorDate: Wed May 1 17:20:18 2024 -0400
NIFI-13103: Make AutoCommit default to no value set in PutDatabaseRecord
Signed-off-by: Joe Gresock <[email protected]>
This closes #8723
---
.../processors/standard/PutDatabaseRecord.java | 86 ++++++++------------
.../processors/standard/PutDatabaseRecordTest.java | 94 +++++++++++++---------
2 files changed, 92 insertions(+), 88 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index c81030d74c..c279569916 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -28,7 +28,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
@@ -74,7 +73,6 @@ import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLDataException;
import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException;
import java.sql.Statement;
@@ -86,7 +84,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -368,8 +365,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
static final PropertyDescriptor MAX_BATCH_SIZE = new Builder()
.name("put-db-record-max-batch-size")
.displayName("Maximum Batch Size")
- .description("Specifies maximum number of statements to be
included in each batch. Zero means the batch size is not limited, "
- + "which can cause memory usage issues for a large number
of statements.")
+ .description("Specifies maximum number of sql statements to be
included in each batch sent to the database. Zero means the batch size is not
limited, "
+ + "and all statements are put into a single batch which
can cause high memory usage issues for a very large number of statements.")
.defaultValue("1000")
.required(false)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
@@ -380,11 +377,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
.name("database-session-autocommit")
.displayName("Database Session AutoCommit")
.description("The autocommit mode to set on the database
connection being used. If set to false, the operation(s) will be explicitly
committed or rolled back "
- + "(based on success or failure respectively). If set to
true, the driver/database automatically handles the commit/rollback.")
+ + "(based on success or failure respectively). If set to
true, the driver/database automatically handles the commit/rollback. "
+ + "Setting this property to 'No value' will leave the
database connection's autocommit mode unmodified.")
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final PropertyDescriptor DB_TYPE;
@@ -491,21 +488,22 @@ public class PutDatabaseRecord extends AbstractProcessor {
);
}
- final boolean autoCommit =
validationContext.getProperty(AUTO_COMMIT).asBoolean();
+ final Boolean autoCommit =
validationContext.getProperty(AUTO_COMMIT).asBoolean();
final boolean rollbackOnFailure =
validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
- if (autoCommit && rollbackOnFailure) {
+ if (autoCommit != null && autoCommit && rollbackOnFailure) {
validationResults.add(new ValidationResult.Builder()
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
.explanation(format("'%s' cannot be set to 'true' when
'%s' is also set to 'true'. "
- + "Transaction rollbacks for batch updates
cannot be supported when auto commit is set to 'true'",
+ + "Transaction rollbacks for batch updates
cannot rollback all the flow file's statements together "
+ + "when auto commit is set to 'true'
because the database autocommits each batch separately.",
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(),
AUTO_COMMIT.getDisplayName()))
.build());
}
- if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
+ if (autoCommit != null && autoCommit &&
!isMaxBatchSizeHardcodedToZero(validationContext)) {
final String explanation = format("'%s' must be hard-coded to
zero when '%s' is set to 'true'."
+ " Batch size equal to zero executes all
statements in a single transaction"
- + " which allows automatic rollback to revert
all statements if an error occurs",
+ + " which allows rollback to revert all the
flow file's statements together if an error occurs.",
MAX_BATCH_SIZE.getDisplayName(),
AUTO_COMMIT.getDisplayName());
validationResults.add(new ValidationResult.Builder()
@@ -547,11 +545,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
dataRecordPath = dataRecordPathValue == null ? null :
RecordPath.compile(dataRecordPathValue);
}
- @OnUnscheduled
- public final void onUnscheduled() {
- supportsBatchUpdates = Optional.empty();
- }
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
FlowFile flowFile = session.get();
@@ -567,18 +560,18 @@ public class PutDatabaseRecord extends AbstractProcessor {
connection = dbcpService.getConnection(flowFile.getAttributes());
originalAutoCommit = connection.getAutoCommit();
- final boolean autoCommit =
context.getProperty(AUTO_COMMIT).asBoolean();
- if (originalAutoCommit != autoCommit) {
+ final Boolean propertyAutoCommitValue =
context.getProperty(AUTO_COMMIT).asBoolean();
+ if (propertyAutoCommitValue != null && originalAutoCommit !=
propertyAutoCommitValue) {
try {
- connection.setAutoCommit(autoCommit);
- } catch (SQLFeatureNotSupportedException sfnse) {
- getLogger().debug(String.format("setAutoCommit(%s) not
supported by this driver", autoCommit), sfnse);
+ connection.setAutoCommit(propertyAutoCommitValue);
+ } catch (Exception ex) {
+ getLogger().debug("Failed to setAutoCommit({}) due to {}",
propertyAutoCommitValue, ex.getClass().getName(), ex);
}
}
putToDatabase(context, session, flowFile, connection);
- // Only commit the connection if auto-commit is false
+ // If the connection's auto-commit setting is false, then manually
commit the transaction
if (!connection.getAutoCommit()) {
connection.commit();
}
@@ -605,12 +598,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
relationship = REL_FAILURE;
}
- getLogger().error("Failed to put Records to database for {}. Routing
to {}.", flowFile, relationship, e);
-
final boolean rollbackOnFailure =
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
if (rollbackOnFailure) {
+ getLogger().error("Failed to put Records to database for {}.
Rolling back NiFi session and returning the flow file to its incoming queue.",
flowFile, e);
session.rollback();
+ context.yield();
} else {
+ getLogger().error("Failed to put Records to database for {}.
Routing to {}.", flowFile, relationship, e);
flowFile = session.putAttribute(flowFile,
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown":
e.getMessage()));
session.transfer(flowFile, relationship);
}
@@ -623,6 +617,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
try {
if (!connection.getAutoCommit()) {
connection.rollback();
+ getLogger().debug("Manually rolled back JDBC
transaction.");
}
} catch (final Exception rollbackException) {
getLogger().error("Failed to rollback JDBC transaction",
rollbackException);
@@ -680,9 +675,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
final ComponentLog log = getLogger();
final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
- // Do not use batch if set to batch size of 1 because that is
similar to not using batching.
+ // Batch Size 0 means put all sql statements into one batch update
no matter how many statements there are.
+ // Do not use batch statements if batch size is equal to 1 because
that is the same as not using batching.
// Also do not use batches if the connection does not support
batching.
- boolean useBatch = maxBatchSize != 1 &&
isSupportBatchUpdates(connection);
+ boolean useBatch = maxBatchSize != 1 &&
isSupportsBatchUpdates(connection);
int currentBatchSize = 0;
int batchIndex = 0;
@@ -1002,13 +998,13 @@ public class PutDatabaseRecord extends AbstractProcessor
{
try (InputStream inputStream = new
ByteArrayInputStream(byteArray)) {
ps.setBlob(index, inputStream);
} catch (SQLException e) {
- throw new IOException("Unable to parse binary data " +
value, e.getCause());
+ throw new IOException("Unable to parse binary data " +
value, e);
}
} else {
try (InputStream inputStream = new
ByteArrayInputStream(value.toString().getBytes(StandardCharsets.UTF_8))) {
ps.setBlob(index, inputStream);
} catch (IOException | SQLException e) {
- throw new IOException("Unable to parse binary data " +
value, e.getCause());
+ throw new IOException("Unable to parse binary data " +
value, e);
}
}
} else if (sqlType == Types.CLOB) {
@@ -1024,7 +1020,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
clob.setString(1, value.toString());
ps.setClob(index, clob);
} catch (SQLException e) {
- throw new IOException("Unable to parse data as CLOB/String
" + value, e.getCause());
+ throw new IOException("Unable to parse data as CLOB/String
" + value, e);
}
}
} else if (sqlType == Types.VARBINARY || sqlType ==
Types.LONGVARBINARY) {
@@ -1045,7 +1041,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
try {
ps.setBytes(index, byteArray);
} catch (SQLException e) {
- throw new IOException("Unable to parse binary data with
size" + byteArray.length, e.getCause());
+ throw new IOException("Unable to parse binary data with
size" + byteArray.length, e);
}
} else {
byte[] byteArray = new byte[0];
@@ -1053,7 +1049,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
byteArray =
value.toString().getBytes(StandardCharsets.UTF_8);
ps.setBytes(index, byteArray);
} catch (SQLException e) {
- throw new IOException("Unable to parse binary data with
size" + byteArray.length, e.getCause());
+ throw new IOException("Unable to parse binary data with
size" + byteArray.length, e);
}
}
} else {
@@ -1612,28 +1608,16 @@ public class PutDatabaseRecord extends
AbstractProcessor {
return normalizedKeyColumnNames;
}
- private Optional<Boolean> supportsBatchUpdates = Optional.empty();
-
- private void initializeSupportBatchUpdates(Connection connection) {
- if (!supportsBatchUpdates.isPresent()) {
- try {
- final DatabaseMetaData dmd = connection.getMetaData();
- supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates());
- getLogger().debug(String.format("Connection
supportsBatchUpdates is %s",
- supportsBatchUpdates.orElse(Boolean.FALSE)));
- } catch (Exception ex) {
- supportsBatchUpdates = Optional.of(Boolean.FALSE);
- getLogger().debug(String.format("Exception while testing if
connection supportsBatchUpdates due to %s - %s",
- ex.getClass().getName(), ex.getMessage()));
- }
+ private boolean isSupportsBatchUpdates(Connection connection) {
+ try {
+ return connection.getMetaData().supportsBatchUpdates();
+ } catch (Exception ex) {
+ getLogger().debug(String.format("Exception while testing if
connection supportsBatchUpdates due to %s - %s",
+ ex.getClass().getName(), ex.getMessage()));
+ return false;
}
}
- private boolean isSupportBatchUpdates(Connection connection) {
- initializeSupportBatchUpdates(connection);
- return supportsBatchUpdates.orElse(Boolean.FALSE);
- }
-
static class SchemaKey {
private final String catalog;
private final String schemaName;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 5c5cd2e8fa..733b109e80 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -80,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@@ -94,11 +95,12 @@ public class PutDatabaseRecordTest {
// DISABLED test cases are used for single-run tests which are not
parameterized
DEFAULT_0(ENABLED, new TestCase(false, false, 0)),
DEFAULT_1(DISABLED, new TestCase(false, false, 1)),
- DEFAULT_2(DISABLED, new TestCase(false, false, 2)),
+ DEFAULT_2(DISABLED, new TestCase(null, false, 2)),
+ DEFAULT_5(DISABLED, new TestCase(null, false, 5)),
DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)),
ROLLBACK_0(DISABLED, new TestCase(false, true, 0)),
- ROLLBACK_1(DISABLED, new TestCase(false, true, 1)),
+ ROLLBACK_1(ENABLED, new TestCase(null, true, 1)),
ROLLBACK_2(DISABLED, new TestCase(false, true, 2)),
ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)),
@@ -199,7 +201,11 @@ public class PutDatabaseRecordTest {
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
- runner.setProperty(PutDatabaseRecord.AUTO_COMMIT,
testCase.getAutoCommitAsString());
+ if (testCase.getAutoCommitAsString() == null) {
+ runner.removeProperty(PutDatabaseRecord.AUTO_COMMIT);
+ } else {
+ runner.setProperty(PutDatabaseRecord.AUTO_COMMIT,
testCase.getAutoCommitAsString());
+ }
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE,
testCase.getRollbackOnFailureAsString());
runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE,
testCase.getBatchSizeAsString());
}
@@ -714,11 +720,29 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS,
String.valueOf(allowMultipleStatements));
+ Supplier<Statement> spyStmt = createStatementSpy();
+
final Map<String, String> attrs = new HashMap<>();
attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
runner.enqueue(new byte[0], attrs);
runner.run();
+ final int maxBatchSize =
runner.getProcessContext().getProperty(PutDatabaseRecord.MAX_BATCH_SIZE).asInteger();
+ assertNotNull(spyStmt.get());
+ if (sqlStatements.length <= 1) {
+ // When there is only 1 sql statement, then never use batching
+ verify(spyStmt.get(), times(0)).executeBatch();
+ } else if (maxBatchSize == 0) {
+ // When maxBatchSize is 0, verify that all statements were
executed in a single executeBatch call
+ verify(spyStmt.get(), times(1)).executeBatch();
+ } else if (maxBatchSize == 1) {
+ // When maxBatchSize is 1, verify that executeBatch was never
called
+ verify(spyStmt.get(), times(0)).executeBatch();
+ } else {
+ // When maxBatchSize > 1, verify that executeBatch was called at
least once
+ verify(spyStmt.get(), atLeastOnce()).executeBatch();
+ }
+
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
@@ -1557,43 +1581,27 @@ public class PutDatabaseRecordTest {
}
@Test
- public void testInsertWithMaxBatchSize() throws InitializationException,
ProcessException, SQLException {
- setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
-
- recreateTable(createPersons);
- final MockRecordParser parser = new MockRecordParser();
- runner.addControllerService("parser", parser);
- runner.enableControllerService(parser);
-
- parser.addSchemaField("id", RecordFieldType.INT);
- parser.addSchemaField("name", RecordFieldType.STRING);
- parser.addSchemaField("code", RecordFieldType.INT);
-
- for (int i = 1; i < 12; i++) {
- parser.addRecord(i, String.format("rec%s", i), 100 + i);
- }
-
- runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
- runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
- runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
- runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, "5");
-
- Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy();
-
- runner.enqueue(new byte[0]);
- runner.run();
-
- runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+ public void testInsertWithMaxBatchSize0() throws InitializationException,
ProcessException, SQLException {
+ testInsertWithBatchSize(TestCaseEnum.DEFAULT_0.getTestCase(), 1);
+ }
- assertEquals(11, getTableSize());
+ @Test
+ public void testInsertWithMaxBatchSize1() throws InitializationException,
ProcessException, SQLException {
+ testInsertWithBatchSize(TestCaseEnum.DEFAULT_1.getTestCase(), 11);
+ }
- assertNotNull(spyStmt.get());
- verify(spyStmt.get(), times(3)).executeBatch();
+ @Test
+ public void testInsertWithMaxBatchSize5() throws InitializationException,
ProcessException, SQLException {
+ testInsertWithBatchSize(TestCaseEnum.DEFAULT_5.getTestCase(), 3);
}
@Test
- public void testInsertWithDefaultMaxBatchSize() throws
InitializationException, ProcessException, SQLException {
- setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
+ public void testInsertWithMaxBatchSize1000() throws
InitializationException, ProcessException, SQLException {
+ testInsertWithBatchSize(TestCaseEnum.DEFAULT_1000.getTestCase(), 1);
+ }
+
+ public void testInsertWithBatchSize(TestCase testCase, int
expectedBatchCount) throws InitializationException, ProcessException,
SQLException {
+ setRunner(testCase);
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
@@ -1611,7 +1619,6 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
- runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE,
PutDatabaseRecord.MAX_BATCH_SIZE.getDefaultValue());
Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy();
@@ -1623,7 +1630,7 @@ public class PutDatabaseRecordTest {
assertEquals(11, getTableSize());
assertNotNull(spyStmt.get());
- verify(spyStmt.get(), times(1)).executeBatch();
+ verify(spyStmt.get(), times(expectedBatchCount)).executeBatch();
}
@Test
@@ -2322,6 +2329,19 @@ public class PutDatabaseRecordTest {
return () -> spyStmt[0];
}
+ private Supplier<Statement> createStatementSpy() {
+ final Statement[] spyStmt = new Statement[1];
+ final Answer<DelegatingConnection> answer = (inv) -> new
DelegatingConnection((Connection) inv.callRealMethod()) {
+ @Override
+ public Statement createStatement() throws SQLException {
+ spyStmt[0] = spy(getDelegate().createStatement());
+ return spyStmt[0];
+ }
+ };
+ doAnswer(answer).when(dbcp).getConnection();
+ return () -> spyStmt[0];
+ }
+
static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
@Override
SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String
tableName, TableSchema tableSchema, DMLSettings settings) throws
IllegalArgumentException {