This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 70cd51ce70 NIFI-14266 Added Content Output Strategy property to 
ExecuteSQL (#9748)
70cd51ce70 is described below

commit 70cd51ce700013cda861b1560d05df38df74dfeb
Author: Marcin Gemra <[email protected]>
AuthorDate: Wed Feb 26 16:39:09 2025 +0100

    NIFI-14266 Added Content Output Strategy property to ExecuteSQL (#9748)
    
    - Content Output Strategy supports current behavior of writing an empty 
result set when the query returns no results, and also supports retaining the 
original content for input FlowFiles
    
    Co-authored-by: Marcin Gemra <[email protected]>
    Co-authored-by: Pierre Villard <[email protected]>
    Co-authored-by: David Handermann <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/standard/AbstractExecuteSQL.java    | 72 ++++++++++++++--
 .../nifi/processors/standard/ExecuteSQL.java       |  7 +-
 .../nifi/processors/standard/ExecuteSQLRecord.java |  2 +-
 .../nifi/processors/standard/TestExecuteSQL.java   | 97 ++++++++++++++++++----
 .../processors/standard/TestExecuteSQLRecord.java  | 95 ++++++++++++++-------
 5 files changed, 214 insertions(+), 59 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 8ac07ace50..b9ba6b5081 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -26,6 +27,7 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.migration.PropertyConfiguration;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -101,9 +103,9 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
-    public static final PropertyDescriptor SQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
-            .name("SQL select query")
-            .description("The SQL select query to execute. The query can be 
empty, a constant value, or built from attributes "
+    public static final PropertyDescriptor SQL_QUERY = new 
PropertyDescriptor.Builder()
+            .name("SQL Query")
+            .description("The SQL query to execute. The query can be empty, a 
constant value, or built from attributes "
                     + "using Expression Language. If this property is 
specified, it will be used regardless of the content of "
                     + "incoming flowfiles. If this property is empty, the 
content of the incoming flow file is expected "
                     + "to contain a valid SQL select query, to be issued by 
the processor to the database. Note that Expression "
@@ -188,6 +190,17 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
             .required(true)
             .build();
 
+    public static final PropertyDescriptor CONTENT_OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Content Output Strategy")
+            .description("""
+                    Specifies the strategy for writing FlowFile content when 
processing input FlowFiles.
+                    The strategy applies when handling queries that do not 
produce results.
+                    """)
+            .allowableValues(ContentOutputStrategy.class)
+            .defaultValue(ContentOutputStrategy.EMPTY)
+            .required(true)
+            .build();
+
     protected List<PropertyDescriptor> propDescriptors;
 
     protected DBCPService dbcpService;
@@ -202,10 +215,15 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         return propDescriptors;
     }
 
+    @Override
+    public void migrateProperties(final PropertyConfiguration config) {
+        config.renameProperty("SQL select query", SQL_QUERY.getName());
+    }
+
     @OnScheduled
     public void setup(ProcessContext context) {
         // If the query is not set, then an incoming flow file is needed. 
Otherwise fail the initialization
-        if (!context.getProperty(SQL_SELECT_QUERY).isSet() && 
!context.hasIncomingConnection()) {
+        if (!context.getProperty(SQL_QUERY).isSet() && 
!context.hasIncomingConnection()) {
             final String errorString = "Either the Select Query must be 
specified or there must be an incoming connection "
                     + "providing flowfile(s) containing a SQL select query";
             getLogger().error(errorString);
@@ -244,8 +262,8 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         SqlWriter sqlWriter = configureSqlWriter(session, context, 
fileToProcess);
 
         String selectQuery;
-        if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
-            selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+        if (context.getProperty(SQL_QUERY).isSet()) {
+            selectQuery = 
context.getProperty(SQL_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
         } else {
             // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
             // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
@@ -456,7 +474,13 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                         session.remove(fileToProcess);
                     } else {
                         // If we had no results then transfer the original 
flow file downstream to trigger processors
-                        session.transfer(setFlowFileEmptyResults(session, 
fileToProcess, sqlWriter), REL_SUCCESS);
+                        final ContentOutputStrategy contentOutputStrategy = 
context.getProperty(CONTENT_OUTPUT_STRATEGY).asAllowableValue(ContentOutputStrategy.class);
+                        if (ContentOutputStrategy.ORIGINAL == 
contentOutputStrategy) {
+                            session.transfer(fileToProcess, REL_SUCCESS);
+                        } else {
+                            // Set Empty Results as the default behavior based 
on strategy or null property
+                            session.transfer(setFlowFileEmptyResults(session, 
fileToProcess, sqlWriter), REL_SUCCESS);
+                        }
                     }
                 } else if (resultCount == 0) {
                     // If we had no inbound FlowFile, no exceptions, and the 
SQL generated no result sets (Insert/Update/Delete statements only)
@@ -531,4 +555,38 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
     }
 
     protected abstract SqlWriter configureSqlWriter(ProcessSession session, 
ProcessContext context, FlowFile fileToProcess);
+
+    enum ContentOutputStrategy implements DescribedValue {
+        EMPTY(
+            "Empty",
+            "Overwrite the input FlowFile content with an empty result set"
+        ),
+        ORIGINAL(
+            "Original",
+            "Retain the input FlowFile content without changes"
+        );
+
+        private final String displayName;
+        private final String description;
+
+        ContentOutputStrategy(final String displayName, final String 
description) {
+            this.displayName = displayName;
+            this.description = description;
+        }
+
+        @Override
+        public String getValue() {
+            return name();
+        }
+
+        @Override
+        public String getDisplayName() {
+            return this.displayName;
+        }
+
+        @Override
+        public String getDescription() {
+            return this.description;
+        }
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index fb806eaed3..e7ef7e6697 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -36,12 +36,12 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.util.db.AvroUtil.CodecType;
 import org.apache.nifi.util.db.JdbcCommon;
 
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.nifi.util.db.AvroUtil.CodecType;
 import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
 import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
 import static org.apache.nifi.util.db.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
@@ -150,7 +150,7 @@ public class ExecuteSQL extends AbstractExecuteSQL {
         propDescriptors = List.of(
                 DBCP_SERVICE,
                 SQL_PRE_QUERY,
-                SQL_SELECT_QUERY,
+                SQL_QUERY,
                 SQL_POST_QUERY,
                 QUERY_TIMEOUT,
                 NORMALIZE_NAMES_FOR_AVRO,
@@ -161,7 +161,8 @@ public class ExecuteSQL extends AbstractExecuteSQL {
                 MAX_ROWS_PER_FLOW_FILE,
                 OUTPUT_BATCH_SIZE,
                 FETCH_SIZE,
-                AUTO_COMMIT
+                AUTO_COMMIT,
+                CONTENT_OUTPUT_STRATEGY
         );
     }
 
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 88f0b9f761..0ac5234773 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -153,7 +153,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
             DBCP_SERVICE,
             SQL_PRE_QUERY,
-            SQL_SELECT_QUERY,
+            SQL_QUERY,
             SQL_POST_QUERY,
             QUERY_TIMEOUT,
             RECORD_WRITER_FACTORY,
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 2ee9c1ddea..81499c568c 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -55,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -117,7 +118,7 @@ public class TestExecuteSQL {
     @Test
     public void testIncomingConnectionWithNoFlowFile() {
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
persons");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM persons");
         runner.run();
         runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
         runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0);
@@ -203,7 +204,7 @@ public class TestExecuteSQL {
         stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 
1)");
 
         runner.setIncomingConnection(false);
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM 
TEST_NULL_INT");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
@@ -211,6 +212,71 @@ public class TestExecuteSQL {
         
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).getFirst().assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX,
 "0");
     }
 
+    @Test
+    public void testDropTableWithOverwrite() throws SQLException, IOException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_DROP_TABLE");
+        } catch (final SQLException ignored) {
+        }
+
+        stmt.execute("create table TEST_DROP_TABLE (id integer not null, val1 
integer, val2 integer)");
+
+        stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (0, 
NULL, 1)");
+        stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (1, 
1, 1)");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "DROP TABLE TEST_DROP_TABLE");
+        runner.enqueue("some data");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
+        final InputStream in = new 
ByteArrayInputStream(flowfiles.getFirst().toByteArray());
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+            assertFalse(dataFileReader.hasNext());
+        }
+    }
+
+    @Test
+    public void testDropTableNoOverwrite() throws SQLException, IOException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_TRUNCATE_TABLE");
+        } catch (final SQLException ignored) {
+        }
+
+        stmt.execute("create table TEST_TRUNCATE_TABLE (id integer not null, 
val1 integer, val2 integer)");
+
+        stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES 
(0, NULL, 1)");
+        stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES 
(1, 1, 1)");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.CONTENT_OUTPUT_STRATEGY, 
AbstractExecuteSQL.ContentOutputStrategy.ORIGINAL);
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "TRUNCATE TABLE 
TEST_TRUNCATE_TABLE");
+        runner.enqueue("some data");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        runner.assertContents(ExecuteSQL.REL_SUCCESS, List.of("some data"));
+    }
+
     @Test
     public void testCompression() throws SQLException, IOException {
         // remove previous test database, if any
@@ -233,7 +299,7 @@ public class TestExecuteSQL {
 
         runner.setIncomingConnection(false);
         runner.setProperty(ExecuteSQL.COMPRESSION_FORMAT, 
AvroUtil.CodecType.BZIP2.name());
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM 
TEST_NULL_INT");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
@@ -269,7 +335,7 @@ public class TestExecuteSQL {
         runner.setIncomingConnection(false);
         runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
         runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "5");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM 
TEST_NULL_INT");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
@@ -311,7 +377,6 @@ public class TestExecuteSQL {
             stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" 
+ i + ", 1, 1)");
         }
 
-
         Map<String, String> attrMap = new HashMap<>();
         String testAttrName = "attr1";
         String testAttrValue = "value1";
@@ -342,8 +407,6 @@ public class TestExecuteSQL {
         
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"199");
         lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue);
         
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, 
inputFlowFile.getAttribute(CoreAttributes.UUID.key()));
-
-
     }
 
     @Test
@@ -371,7 +434,7 @@ public class TestExecuteSQL {
         runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
         runner.setProperty(AbstractExecuteSQL.FETCH_SIZE, "5");
         runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM 
TEST_NULL_INT");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
@@ -410,7 +473,7 @@ public class TestExecuteSQL {
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
 
         runner.setIncomingConnection(false);
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "insert into 
TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "insert into TEST_NULL_INT 
(id, val1, val2) VALUES (0, NULL, 1)");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
@@ -435,7 +498,7 @@ public class TestExecuteSQL {
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from 
TEST_NULL_INT");
         runner.enqueue("Hello".getBytes());
         runner.run();
 
@@ -481,7 +544,7 @@ public class TestExecuteSQL {
         stmt.execute("insert into host2 values(1,'host2')");
         stmt.execute("select a.host as hostA,b.host as hostB from host1 a join 
host2 b on b.id=a.id");
         runner.setIncomingConnection(false);
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select a.host as 
hostA,b.host as hostB from host1 a join host2 b on b.id=a.id");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "select a.host as 
hostA,b.host as hostB from host1 a join host2 b on b.id=a.id");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
@@ -507,7 +570,7 @@ public class TestExecuteSQL {
 
         runner.setIncomingConnection(false);
         // Try a valid SQL statement that will generate an error (val1 does 
not exist, e.g.)
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM 
TEST_NO_ROWS");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT val1 FROM 
TEST_NO_ROWS");
         runner.run();
 
         //No incoming flow file containing a query, and an exception causes no 
outbound flowfile.
@@ -584,7 +647,7 @@ public class TestExecuteSQL {
         }
 
         if (setQueryProperty) {
-            runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
+            runner.setProperty(ExecuteSQL.SQL_QUERY, query);
         }
 
         runner.run();
@@ -639,7 +702,7 @@ public class TestExecuteSQL {
 
         runner.setIncomingConnection(true);
         runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from 
TEST_NULL_INT");
         runner.enqueue("test".getBytes());
         runner.run();
 
@@ -684,7 +747,7 @@ public class TestExecuteSQL {
 
         runner.setIncomingConnection(true);
         runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from 
TEST_NULL_INT");
         runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
         runner.enqueue("test".getBytes());
         runner.run();
@@ -730,7 +793,7 @@ public class TestExecuteSQL {
         runner.setIncomingConnection(true);
         // Simulate failure by not provide parameter
         runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from 
TEST_NULL_INT");
         runner.enqueue("test".getBytes());
         runner.run();
 
@@ -756,7 +819,7 @@ public class TestExecuteSQL {
 
         runner.setIncomingConnection(true);
         runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from 
TEST_NULL_INT");
         // Simulate failure by not provide parameter
         runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
         runner.enqueue("test".getBytes());
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 1f0fe16e69..ba5edac95f 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -124,7 +124,7 @@ public class TestExecuteSQLRecord {
     @Test
     public void testIncomingConnectionWithNoFlowFile() throws 
InitializationException {
         runner.setIncomingConnection(true);
-        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
persons");
+        runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "SELECT * FROM 
persons");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -209,7 +209,7 @@ public class TestExecuteSQLRecord {
         runner.setIncomingConnection(false);
         runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
         runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "5");
-        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "SELECT * FROM 
TEST_NULL_INT");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 
200);
@@ -354,7 +354,7 @@ public class TestExecuteSQLRecord {
         runner.setIncomingConnection(false);
         runner.setProperty(AbstractExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
         runner.setProperty(AbstractExecuteSQL.OUTPUT_BATCH_SIZE, "0");
-        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "SELECT * FROM 
TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -402,7 +402,7 @@ public class TestExecuteSQLRecord {
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
 
         runner.setIncomingConnection(false);
-        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "insert into 
TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+        runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "insert into 
TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -442,7 +442,7 @@ public class TestExecuteSQLRecord {
                 + "CAST ('Hello World' AS CLOB), CAST ('I am an NCLOB' AS 
NCLOB))");
 
         runner.setIncomingConnection(false);
-        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "select * from 
TEST_NULL_INT");
         AvroRecordSetWriter recordWriter = new AvroRecordSetWriter();
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(recordWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
@@ -455,25 +455,26 @@ public class TestExecuteSQLRecord {
         flowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, 
"1");
 
         ByteArrayInputStream bais = new 
ByteArrayInputStream(flowFile.toByteArray());
-        final DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(bais, new GenericDatumReader<>());
-        final Schema avroSchema = dataFileStream.getSchema();
-        GenericData.setStringType(avroSchema, GenericData.StringType.String);
-        final GenericRecord avroRecord = dataFileStream.next();
-
-        Object imageObj = avroRecord.get("IMAGE");
-        assertNotNull(imageObj);
-        assertInstanceOf(ByteBuffer.class, imageObj);
-        assertArrayEquals(new byte[]{(byte) 0xDE, (byte) 0xAD, (byte) 0xBE, 
(byte) 0xEF}, ((ByteBuffer) imageObj).array());
-
-        Object wordsObj = avroRecord.get("WORDS");
-        assertNotNull(wordsObj);
-        assertInstanceOf(Utf8.class, wordsObj);
-        assertEquals("Hello World", wordsObj.toString());
-
-        Object natwordsObj = avroRecord.get("NATWORDS");
-        assertNotNull(natwordsObj);
-        assertInstanceOf(Utf8.class, natwordsObj);
-        assertEquals("I am an NCLOB", natwordsObj.toString());
+        try (DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(bais, new GenericDatumReader<>())) {
+            final Schema avroSchema = dataFileStream.getSchema();
+            GenericData.setStringType(avroSchema, 
GenericData.StringType.String);
+            final GenericRecord avroRecord = dataFileStream.next();
+
+            Object imageObj = avroRecord.get("IMAGE");
+            assertNotNull(imageObj);
+            assertInstanceOf(ByteBuffer.class, imageObj);
+            assertArrayEquals(new byte[] {(byte) 0xDE, (byte) 0xAD, (byte) 
0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array());
+
+            Object wordsObj = avroRecord.get("WORDS");
+            assertNotNull(wordsObj);
+            assertInstanceOf(Utf8.class, wordsObj);
+            assertEquals("Hello World", wordsObj.toString());
+
+            Object natwordsObj = avroRecord.get("NATWORDS");
+            assertNotNull(natwordsObj);
+            assertInstanceOf(Utf8.class, natwordsObj);
+            assertEquals("I am an NCLOB", natwordsObj.toString());
+        }
     }
 
     @Test
@@ -494,7 +495,39 @@ public class TestExecuteSQLRecord {
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from 
TEST_NULL_INT");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.enqueue("Hello".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"0");
+        firstFlowFile.assertContentEquals("");
+    }
+
+    @Test
+    public void testNoResultCreatesEmptyFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException ignored) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "drop table 
TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -527,7 +560,7 @@ public class TestExecuteSQLRecord {
 
         runner.setIncomingConnection(false);
         // Try a valid SQL statement that will generate an error (val1 does 
not exist, e.g.)
-        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 
FROM TEST_NO_ROWS");
+        runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "SELECT val1 FROM 
TEST_NO_ROWS");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -581,7 +614,7 @@ public class TestExecuteSQLRecord {
         }
 
         if (setQueryProperty) {
-            runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, query);
+            runner.setProperty(AbstractExecuteSQL.SQL_QUERY, query);
         }
 
         runner.run();
@@ -654,7 +687,7 @@ public class TestExecuteSQLRecord {
 
         runner.setIncomingConnection(true);
         runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from 
TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -687,7 +720,7 @@ public class TestExecuteSQLRecord {
 
         runner.setIncomingConnection(true);
         runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from 
TEST_NULL_INT");
         runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
@@ -721,7 +754,7 @@ public class TestExecuteSQLRecord {
         runner.setIncomingConnection(true);
         // Simulate failure by not provide parameter
         runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
-        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from 
TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -751,7 +784,7 @@ public class TestExecuteSQLRecord {
 
         runner.setIncomingConnection(true);
         runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from 
TEST_NULL_INT");
         // Simulate failure by not provide parameter
         runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);


Reply via email to