This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e5f1db1970 NIFI-15224 Added Pre-Processing SQL Properties to
PutDatabaseRecord
e5f1db1970 is described below
commit e5f1db1970159412e349d728272343f6f9658f82
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Jan 14 11:16:00 2026 -0600
NIFI-15224 Added Pre-Processing SQL Properties to PutDatabaseRecord
This closes #10771.
Co-authored-by: spects <[email protected]>
Signed-off-by: Pierre Villard <[email protected]>
---
.../processors/standard/PutDatabaseRecord.java | 68 ++++++++++++++++++++++
.../processors/standard/PutDatabaseRecordTest.java | 54 +++++++++++++++++
2 files changed, 122 insertions(+)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 6d8661fbbb..622ef738e9 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -215,6 +215,30 @@ public class PutDatabaseRecord extends AbstractProcessor {
.expressionLanguageSupported(NONE)
.build();
+ static final PropertyDescriptor PRE_PROCESSING_SQL = new Builder()
+ .name("Pre-Processing SQL")
+ .description("""
+ One or more SQL statements, separated by semicolons,
executed on the current database connection,
+ before processing records for each FlowFile. Literal
semicolons can be included in SQL statements
+ by prefixing the semicolon with a backslash character.
+ """)
+ .required(false)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor POST_PROCESSING_SQL = new Builder()
+ .name("Post-Processing SQL")
+ .description("""
+ One or more SQL statements, separated by semicolons,
executed on the current database connection,
+ after processing records for each FlowFile. Literal
semicolons can be included in SQL statements
+ by prefixing the semicolon with a backslash.
+ """)
+ .required(false)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
static final PropertyDescriptor DBCP_SERVICE = new Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain a
connection to the database for sending records.")
@@ -416,6 +440,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
static final PropertyDescriptor DB_TYPE =
DatabaseAdapterDescriptor.getDatabaseTypeDescriptor();
static final PropertyDescriptor DATABASE_DIALECT_SERVICE =
DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);
+ static final Pattern UNESCAPED_SEMICOLON_PATTERN =
Pattern.compile("(?<!\\\\);");
+ static final String ESCAPED_SEMICOLON = "\\\\;";
+ static final String SEMICOLON = ";";
+
protected static final List<PropertyDescriptor> properties = List.of(
RECORD_READER_FACTORY,
DB_TYPE,
@@ -423,6 +451,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
STATEMENT_TYPE,
STATEMENT_TYPE_RECORD_PATH,
DATA_RECORD_PATH,
+ PRE_PROCESSING_SQL,
+ POST_PROCESSING_SQL,
DBCP_SERVICE,
CATALOG_NAME,
SCHEMA_NAME,
@@ -1184,6 +1214,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
private void putToDatabase(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile, final Connection connection)
throws Exception {
final String statementType = getStatementType(context, flowFile);
+ final List<String> preProcessingSqlStatements =
getSqlStatements(context, flowFile, PRE_PROCESSING_SQL);
+ executeSqlStatements(connection, preProcessingSqlStatements,
PRE_PROCESSING_SQL);
+
try (final InputStream in = session.read(flowFile)) {
final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger());
@@ -1195,6 +1228,41 @@ public class PutDatabaseRecord extends AbstractProcessor
{
executeDML(context, session, flowFile, connection,
recordReader, statementType, settings);
}
}
+
+ final List<String> postProcessingSqlStatements =
getSqlStatements(context, flowFile, POST_PROCESSING_SQL);
+ executeSqlStatements(connection, postProcessingSqlStatements,
POST_PROCESSING_SQL);
+ }
+
+ private List<String> getSqlStatements(final ProcessContext context, final
FlowFile flowFile, final PropertyDescriptor propertyDescriptor) {
+ final List<String> sqlStatements;
+
+ final String propertyValue =
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();
+ if (propertyValue == null || propertyValue.isBlank()) {
+ sqlStatements = List.of();
+ } else {
+ final String[] statements =
UNESCAPED_SEMICOLON_PATTERN.split(propertyValue);
+ sqlStatements = new ArrayList<>(statements.length);
+ for (String statement : statements) {
+ final String sqlStatement =
statement.replaceAll(ESCAPED_SEMICOLON, SEMICOLON).trim();
+ if (sqlStatement.isEmpty()) {
+ continue;
+ }
+ sqlStatements.add(sqlStatement);
+ }
+ }
+
+ return sqlStatements;
+ }
+
+ private void executeSqlStatements(final Connection connection, final
List<String> sqlStatements, final PropertyDescriptor propertyDescriptor) {
+ for (final String sql : sqlStatements) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (final SQLException e) {
+ final String message = "Failed to execute %s Statement
[%s]".formatted(propertyDescriptor.getName(), sql);
+ throw new ProcessException(message, e);
+ }
+ }
}
String generateTableName(final DMLSettings settings, final String catalog,
final String schemaName, final String tableName, final TableSchema tableSchema)
{
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index fc7548bca3..4478bff83b 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -127,6 +127,7 @@ class PutDatabaseRecordTest extends
AbstractDatabaseConnectionServiceTest {
private static final boolean ENABLED = true;
private static final boolean DISABLED = false;
+ private static final String BATCHES_EXECUTED_COUNTER = "Batches Executed";
private static final String CONNECTION_FAILED = "Connection Failed";
private static final String PARSER_ID =
MockRecordParser.class.getSimpleName();
@@ -2211,6 +2212,59 @@ class PutDatabaseRecordTest extends
AbstractDatabaseConnectionServiceTest {
conn.close();
}
+ @Test
+ public void testPrePostProcessingSql() throws InitializationException,
ProcessException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService(PARSER_ID, parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addRecord(1, "testing");
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, PARSER_ID);
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, TABLE_NAME);
+
+ final String dropCreateSql = "DROP TABLE IF EXISTS %s;
%s".formatted(TABLE_NAME, createPersons);
+ runner.setProperty(PutDatabaseRecord.PRE_PROCESSING_SQL,
dropCreateSql);
+ runner.setProperty(PutDatabaseRecord.POST_PROCESSING_SQL, "DROP TABLE
PERSONS");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS);
+ final Long batchesExecuted =
runner.getCounterValue(BATCHES_EXECUTED_COUNTER);
+ assertEquals(1, batchesExecuted, "Batches executed counter not
matched");
+ }
+
+ @Test
+ public void testPreProcessingSqlFailure() throws InitializationException,
ProcessException {
+ setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService(PARSER_ID, parser);
+ runner.enableControllerService(parser);
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, PARSER_ID);
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, TABLE_NAME);
+
+ final String preProcessingSql = "DROP TABLE UNKNOWN";
+ runner.setProperty(PutDatabaseRecord.PRE_PROCESSING_SQL,
preProcessingSql);
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
+ final MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).getFirst();
+
firstFlowFile.assertAttributeExists(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR);
+ final String recordError =
firstFlowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR);
+
assertTrue(recordError.contains(PutDatabaseRecord.PRE_PROCESSING_SQL.getName()),
"Pre-Processing SQL not found in [%s]".formatted(recordError));
+ }
+
private void recreateTable() throws ProcessException {
try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) {