This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e5f1db1970 NIFI-15224 Added Pre-Processing SQL Properties to 
PutDatabaseRecord
e5f1db1970 is described below

commit e5f1db1970159412e349d728272343f6f9658f82
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Jan 14 11:16:00 2026 -0600

    NIFI-15224 Added Pre-Processing SQL Properties to PutDatabaseRecord
    
    This closes #10771.
    
    Co-authored-by: spects <[email protected]>
    Signed-off-by: Pierre Villard <[email protected]>
---
 .../processors/standard/PutDatabaseRecord.java     | 68 ++++++++++++++++++++++
 .../processors/standard/PutDatabaseRecordTest.java | 54 +++++++++++++++++
 2 files changed, 122 insertions(+)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 6d8661fbbb..622ef738e9 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -215,6 +215,30 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .expressionLanguageSupported(NONE)
             .build();
 
+    static final PropertyDescriptor PRE_PROCESSING_SQL = new Builder()
+            .name("Pre-Processing SQL")
+            .description("""
+                    One or more SQL statements, separated by semicolons, 
executed on the current database connection,
+                    before processing records for each FlowFile. Literal 
semicolons can be included in SQL statements
+                    by prefixing the semicolon with a backslash character.
+                    """)
+            .required(false)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor POST_PROCESSING_SQL = new Builder()
+            .name("Post-Processing SQL")
+            .description("""
+                    One or more SQL statements, separated by semicolons, 
executed on the current database connection,
+                    after processing records for each FlowFile. Literal 
semicolons can be included in SQL statements
+                    by prefixing the semicolon with a backslash.
+                    """)
+            .required(false)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     static final PropertyDescriptor DBCP_SERVICE = new Builder()
             .name("Database Connection Pooling Service")
             .description("The Controller Service that is used to obtain a 
connection to the database for sending records.")
@@ -416,6 +440,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
     static final PropertyDescriptor DB_TYPE = 
DatabaseAdapterDescriptor.getDatabaseTypeDescriptor();
     static final PropertyDescriptor DATABASE_DIALECT_SERVICE = 
DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);
 
+    static final Pattern UNESCAPED_SEMICOLON_PATTERN = 
Pattern.compile("(?<!\\\\);");
+    static final String ESCAPED_SEMICOLON = "\\\\;";
+    static final String SEMICOLON = ";";
+
     protected static final List<PropertyDescriptor> properties = List.of(
             RECORD_READER_FACTORY,
             DB_TYPE,
@@ -423,6 +451,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
             STATEMENT_TYPE,
             STATEMENT_TYPE_RECORD_PATH,
             DATA_RECORD_PATH,
+            PRE_PROCESSING_SQL,
+            POST_PROCESSING_SQL,
             DBCP_SERVICE,
             CATALOG_NAME,
             SCHEMA_NAME,
@@ -1184,6 +1214,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
     private void putToDatabase(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile, final Connection connection) 
throws Exception {
         final String statementType = getStatementType(context, flowFile);
 
+        final List<String> preProcessingSqlStatements = 
getSqlStatements(context, flowFile, PRE_PROCESSING_SQL);
+        executeSqlStatements(connection, preProcessingSqlStatements, 
PRE_PROCESSING_SQL);
+
         try (final InputStream in = session.read(flowFile)) {
             final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
             final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger());
@@ -1195,6 +1228,41 @@ public class PutDatabaseRecord extends AbstractProcessor 
{
                 executeDML(context, session, flowFile, connection, 
recordReader, statementType, settings);
             }
         }
+
+        final List<String> postProcessingSqlStatements = 
getSqlStatements(context, flowFile, POST_PROCESSING_SQL);
+        executeSqlStatements(connection, postProcessingSqlStatements, 
POST_PROCESSING_SQL);
+    }
+
+    private List<String> getSqlStatements(final ProcessContext context, final 
FlowFile flowFile, final PropertyDescriptor propertyDescriptor) {
+        final List<String> sqlStatements;
+
+        final String propertyValue = 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();
+        if (propertyValue == null || propertyValue.isBlank()) {
+            sqlStatements = List.of();
+        } else {
+            final String[] statements = 
UNESCAPED_SEMICOLON_PATTERN.split(propertyValue);
+            sqlStatements = new ArrayList<>(statements.length);
+            for (String statement : statements) {
+                final String sqlStatement = 
statement.replaceAll(ESCAPED_SEMICOLON, SEMICOLON).trim();
+                if (sqlStatement.isEmpty()) {
+                    continue;
+                }
+                sqlStatements.add(sqlStatement);
+            }
+        }
+
+        return sqlStatements;
+    }
+
+    private void executeSqlStatements(final Connection connection, final 
List<String> sqlStatements, final PropertyDescriptor propertyDescriptor) {
+        for (final String sql : sqlStatements) {
+            try (Statement statement = connection.createStatement()) {
+                statement.execute(sql);
+            } catch (final SQLException e) {
+                final String message = "Failed to execute %s Statement 
[%s]".formatted(propertyDescriptor.getName(), sql);
+                throw new ProcessException(message, e);
+            }
+        }
     }
 
     String generateTableName(final DMLSettings settings, final String catalog, 
final String schemaName, final String tableName, final TableSchema tableSchema) 
{
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index fc7548bca3..4478bff83b 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -127,6 +127,7 @@ class PutDatabaseRecordTest extends 
AbstractDatabaseConnectionServiceTest {
     private static final boolean ENABLED = true;
     private static final boolean DISABLED = false;
 
+    private static final String BATCHES_EXECUTED_COUNTER = "Batches Executed";
     private static final String CONNECTION_FAILED = "Connection Failed";
 
     private static final String PARSER_ID = 
MockRecordParser.class.getSimpleName();
@@ -2211,6 +2212,59 @@ class PutDatabaseRecordTest extends 
AbstractDatabaseConnectionServiceTest {
         conn.close();
     }
 
+    @Test
+    public void testPrePostProcessingSql() throws InitializationException, 
ProcessException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService(PARSER_ID, parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addRecord(1, "testing");
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, PARSER_ID);
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, TABLE_NAME);
+
+        final String dropCreateSql = "DROP TABLE IF EXISTS %s; 
%s".formatted(TABLE_NAME, createPersons);
+        runner.setProperty(PutDatabaseRecord.PRE_PROCESSING_SQL, 
dropCreateSql);
+        runner.setProperty(PutDatabaseRecord.POST_PROCESSING_SQL, "DROP TABLE 
PERSONS");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS);
+        final Long batchesExecuted = 
runner.getCounterValue(BATCHES_EXECUTED_COUNTER);
+        assertEquals(1, batchesExecuted, "Batches executed counter not 
matched");
+    }
+
+    @Test
+    public void testPreProcessingSqlFailure() throws InitializationException, 
ProcessException {
+        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
+
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService(PARSER_ID, parser);
+        runner.enableControllerService(parser);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, PARSER_ID);
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, TABLE_NAME);
+
+        final String preProcessingSql = "DROP TABLE UNKNOWN";
+        runner.setProperty(PutDatabaseRecord.PRE_PROCESSING_SQL, 
preProcessingSql);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
+        final MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).getFirst();
+        
firstFlowFile.assertAttributeExists(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR);
+        final String recordError = 
firstFlowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR);
+        
assertTrue(recordError.contains(PutDatabaseRecord.PRE_PROCESSING_SQL.getName()),
 "Pre-Processing SQL not found in [%s]".formatted(recordError));
+    }
+
     private void recreateTable() throws ProcessException {
         try (final Connection conn = dbcp.getConnection();
              final Statement stmt = conn.createStatement()) {

Reply via email to