This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch NIFI-14266
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 8e52d9c53d1b556482d278bb61227a971972625b
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Feb 13 20:34:55 2025 +0100

    NIFI-14266 - Allow ExecuteSQL to not overwrite flowfile content when no 
result
---
 .../processors/standard/AbstractExecuteSQL.java    | 20 +++++-
 .../nifi/processors/standard/ExecuteSQL.java       |  5 +-
 .../nifi/processors/standard/TestExecuteSQL.java   | 69 ++++++++++++++++++++-
 .../processors/standard/TestExecuteSQLRecord.java  | 71 ++++++++++++++++------
 4 files changed, 139 insertions(+), 26 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 8ac07ace50..245180e7e6 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -103,7 +103,8 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
 
     public static final PropertyDescriptor SQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
             .name("SQL select query")
-            .description("The SQL select query to execute. The query can be 
empty, a constant value, or built from attributes "
+            .displayName("SQL Query")
+            .description("The SQL query to execute. The query can be empty, a 
constant value, or built from attributes "
                     + "using Expression Language. If this property is 
specified, it will be used regardless of the content of "
                     + "incoming flowfiles. If this property is empty, the 
content of the incoming flow file is expected "
                     + "to contain a valid SQL select query, to be issued by 
the processor to the database. Note that Expression "
@@ -188,6 +189,17 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
             .required(true)
             .build();
 
+    public static final PropertyDescriptor OVERWRITE_FLOW_FILE_CONTENT = new 
PropertyDescriptor.Builder()
+            .name("Overwrite FlowFile Content")
+            .description("""
+                    If the query didn't return any result, this property 
specifies if the processor should overwrite the
+                    FlowFile's content (when the processor is triggered by an 
incoming FlowFile).
+                    """)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+
     protected List<PropertyDescriptor> propDescriptors;
 
     protected DBCPService dbcpService;
@@ -456,7 +468,11 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                         session.remove(fileToProcess);
                     } else {
                         // If we had no results then transfer the original 
flow file downstream to trigger processors
-                        session.transfer(setFlowFileEmptyResults(session, 
fileToProcess, sqlWriter), REL_SUCCESS);
+                        if 
(context.getProperty(OVERWRITE_FLOW_FILE_CONTENT).asBoolean() == null || 
context.getProperty(OVERWRITE_FLOW_FILE_CONTENT).asBoolean()) {
+                            session.transfer(setFlowFileEmptyResults(session, 
fileToProcess, sqlWriter), REL_SUCCESS);
+                        } else {
+                            session.transfer(fileToProcess, REL_SUCCESS);
+                        }
                     }
                 } else if (resultCount == 0) {
                     // If we had no inbound FlowFile, no exceptions, and the 
SQL generated no result sets (Insert/Update/Delete statements only)
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index fb806eaed3..f681f45ed4 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -36,12 +36,12 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.util.db.AvroUtil.CodecType;
 import org.apache.nifi.util.db.JdbcCommon;
 
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.nifi.util.db.AvroUtil.CodecType;
 import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
 import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
 import static org.apache.nifi.util.db.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
@@ -161,7 +161,8 @@ public class ExecuteSQL extends AbstractExecuteSQL {
                 MAX_ROWS_PER_FLOW_FILE,
                 OUTPUT_BATCH_SIZE,
                 FETCH_SIZE,
-                AUTO_COMMIT
+                AUTO_COMMIT,
+                OVERWRITE_FLOW_FILE_CONTENT
         );
     }
 
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 2ee9c1ddea..706d3b6f25 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -55,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -211,6 +212,71 @@ public class TestExecuteSQL {
         
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).getFirst().assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX,
 "0");
     }
 
+    @Test
+    public void testDropTableWithOverwrite() throws SQLException, IOException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_DROP_TABLE");
+        } catch (final SQLException ignored) {
+        }
+
+        stmt.execute("create table TEST_DROP_TABLE (id integer not null, val1 
integer, val2 integer)");
+
+        stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (0, 
NULL, 1)");
+        stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (1, 
1, 1)");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "DROP TABLE 
TEST_DROP_TABLE");
+        runner.enqueue("some data");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
+        final InputStream in = new 
ByteArrayInputStream(flowfiles.getFirst().toByteArray());
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+            assertFalse(dataFileReader.hasNext());
+        }
+    }
+
+    @Test
+    public void testDropTableNoOverwrite() throws SQLException, IOException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_TRUNCATE_TABLE");
+        } catch (final SQLException ignored) {
+        }
+
+        stmt.execute("create table TEST_TRUNCATE_TABLE (id integer not null, 
val1 integer, val2 integer)");
+
+        stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES 
(0, NULL, 1)");
+        stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES 
(1, 1, 1)");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.OVERWRITE_FLOW_FILE_CONTENT, "false");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "TRUNCATE TABLE 
TEST_TRUNCATE_TABLE");
+        runner.enqueue("some data");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        runner.assertContents(ExecuteSQL.REL_SUCCESS, List.of("some data"));
+    }
+
     @Test
     public void testCompression() throws SQLException, IOException {
         // remove previous test database, if any
@@ -311,7 +377,6 @@ public class TestExecuteSQL {
             stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" 
+ i + ", 1, 1)");
         }
 
-
         Map<String, String> attrMap = new HashMap<>();
         String testAttrName = "attr1";
         String testAttrValue = "value1";
@@ -342,8 +407,6 @@ public class TestExecuteSQL {
         
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"199");
         lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue);
         
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, 
inputFlowFile.getAttribute(CoreAttributes.UUID.key()));
-
-
     }
 
     @Test
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 1f0fe16e69..7ed2883dcb 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -455,25 +455,26 @@ public class TestExecuteSQLRecord {
         flowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, 
"1");
 
         ByteArrayInputStream bais = new 
ByteArrayInputStream(flowFile.toByteArray());
-        final DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(bais, new GenericDatumReader<>());
-        final Schema avroSchema = dataFileStream.getSchema();
-        GenericData.setStringType(avroSchema, GenericData.StringType.String);
-        final GenericRecord avroRecord = dataFileStream.next();
-
-        Object imageObj = avroRecord.get("IMAGE");
-        assertNotNull(imageObj);
-        assertInstanceOf(ByteBuffer.class, imageObj);
-        assertArrayEquals(new byte[]{(byte) 0xDE, (byte) 0xAD, (byte) 0xBE, 
(byte) 0xEF}, ((ByteBuffer) imageObj).array());
-
-        Object wordsObj = avroRecord.get("WORDS");
-        assertNotNull(wordsObj);
-        assertInstanceOf(Utf8.class, wordsObj);
-        assertEquals("Hello World", wordsObj.toString());
-
-        Object natwordsObj = avroRecord.get("NATWORDS");
-        assertNotNull(natwordsObj);
-        assertInstanceOf(Utf8.class, natwordsObj);
-        assertEquals("I am an NCLOB", natwordsObj.toString());
+        try (DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(bais, new GenericDatumReader<>())) {
+            final Schema avroSchema = dataFileStream.getSchema();
+            GenericData.setStringType(avroSchema, 
GenericData.StringType.String);
+            final GenericRecord avroRecord = dataFileStream.next();
+
+            Object imageObj = avroRecord.get("IMAGE");
+            assertNotNull(imageObj);
+            assertInstanceOf(ByteBuffer.class, imageObj);
+            assertArrayEquals(new byte[] {(byte) 0xDE, (byte) 0xAD, (byte) 
0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array());
+
+            Object wordsObj = avroRecord.get("WORDS");
+            assertNotNull(wordsObj);
+            assertInstanceOf(Utf8.class, wordsObj);
+            assertEquals("Hello World", wordsObj.toString());
+
+            Object natwordsObj = avroRecord.get("NATWORDS");
+            assertNotNull(natwordsObj);
+            assertInstanceOf(Utf8.class, natwordsObj);
+            assertEquals("I am an NCLOB", natwordsObj.toString());
+        }
     }
 
     @Test
@@ -508,6 +509,38 @@ public class TestExecuteSQLRecord {
         firstFlowFile.assertContentEquals("");
     }
 
+    @Test
+    public void testNoResultCreatesEmptyFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException ignored) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "drop table 
TEST_NULL_INT");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.enqueue("Hello".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"0");
+        firstFlowFile.assertContentEquals("");
+    }
+
     @Test
     public void testWithSqlException() throws Exception {
         // remove previous test database, if any

Reply via email to