This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 70cd51ce70 NIFI-14266 Added Content Output Strategy property to
ExecuteSQL (#9748)
70cd51ce70 is described below
commit 70cd51ce700013cda861b1560d05df38df74dfeb
Author: Marcin Gemra <[email protected]>
AuthorDate: Wed Feb 26 16:39:09 2025 +0100
NIFI-14266 Added Content Output Strategy property to ExecuteSQL (#9748)
- Content Output Strategy supports current behavior of writing an empty
result set when the query returns no results, and also supports retaining the
original content for input FlowFiles
Co-authored-by: Marcin Gemra <[email protected]>
Co-authored-by: Pierre Villard <[email protected]>
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../processors/standard/AbstractExecuteSQL.java | 72 ++++++++++++++--
.../nifi/processors/standard/ExecuteSQL.java | 7 +-
.../nifi/processors/standard/ExecuteSQLRecord.java | 2 +-
.../nifi/processors/standard/TestExecuteSQL.java | 97 ++++++++++++++++++----
.../processors/standard/TestExecuteSQLRecord.java | 95 ++++++++++++++-------
5 files changed, 214 insertions(+), 59 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 8ac07ace50..b9ba6b5081 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -26,6 +27,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -101,9 +103,9 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- public static final PropertyDescriptor SQL_SELECT_QUERY = new
PropertyDescriptor.Builder()
- .name("SQL select query")
- .description("The SQL select query to execute. The query can be
empty, a constant value, or built from attributes "
+ public static final PropertyDescriptor SQL_QUERY = new
PropertyDescriptor.Builder()
+ .name("SQL Query")
+ .description("The SQL query to execute. The query can be empty, a
constant value, or built from attributes "
+ "using Expression Language. If this property is
specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the
content of the incoming flow file is expected "
+ "to contain a valid SQL select query, to be issued by
the processor to the database. Note that Expression "
@@ -188,6 +190,17 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
.required(true)
.build();
+ public static final PropertyDescriptor CONTENT_OUTPUT_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Content Output Strategy")
+ .description("""
+ Specifies the strategy for writing FlowFile content when
processing input FlowFiles.
+ The strategy applies when handling queries that do not
produce results.
+ """)
+ .allowableValues(ContentOutputStrategy.class)
+ .defaultValue(ContentOutputStrategy.EMPTY)
+ .required(true)
+ .build();
+
protected List<PropertyDescriptor> propDescriptors;
protected DBCPService dbcpService;
@@ -202,10 +215,15 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
return propDescriptors;
}
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ config.renameProperty("SQL select query", SQL_QUERY.getName());
+ }
+
@OnScheduled
public void setup(ProcessContext context) {
// If the query is not set, then an incoming flow file is needed.
Otherwise fail the initialization
- if (!context.getProperty(SQL_SELECT_QUERY).isSet() &&
!context.hasIncomingConnection()) {
+ if (!context.getProperty(SQL_QUERY).isSet() &&
!context.hasIncomingConnection()) {
final String errorString = "Either the Select Query must be
specified or there must be an incoming connection "
+ "providing flowfile(s) containing a SQL select query";
getLogger().error(errorString);
@@ -244,8 +262,8 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
SqlWriter sqlWriter = configureSqlWriter(session, context,
fileToProcess);
String selectQuery;
- if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
- selectQuery =
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+ if (context.getProperty(SQL_QUERY).isSet()) {
+ selectQuery =
context.getProperty(SQL_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is
required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be
called as the processor will fail when scheduled.
@@ -456,7 +474,13 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
session.remove(fileToProcess);
} else {
// If we had no results then transfer the original
flow file downstream to trigger processors
- session.transfer(setFlowFileEmptyResults(session,
fileToProcess, sqlWriter), REL_SUCCESS);
+ final ContentOutputStrategy contentOutputStrategy =
context.getProperty(CONTENT_OUTPUT_STRATEGY).asAllowableValue(ContentOutputStrategy.class);
+ if (ContentOutputStrategy.ORIGINAL ==
contentOutputStrategy) {
+ session.transfer(fileToProcess, REL_SUCCESS);
+ } else {
+ // Set Empty Results as the default behavior based
on strategy or null property
+ session.transfer(setFlowFileEmptyResults(session,
fileToProcess, sqlWriter), REL_SUCCESS);
+ }
}
} else if (resultCount == 0) {
// If we had no inbound FlowFile, no exceptions, and the
SQL generated no result sets (Insert/Update/Delete statements only)
@@ -531,4 +555,38 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
}
protected abstract SqlWriter configureSqlWriter(ProcessSession session,
ProcessContext context, FlowFile fileToProcess);
+
+ enum ContentOutputStrategy implements DescribedValue {
+ EMPTY(
+ "Empty",
+ "Overwrite the input FlowFile content with an empty result set"
+ ),
+ ORIGINAL(
+ "Original",
+ "Retain the input FlowFile content without changes"
+ );
+
+ private final String displayName;
+ private final String description;
+
+ ContentOutputStrategy(final String displayName, final String
description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return this.displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+ }
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index fb806eaed3..e7ef7e6697 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -36,12 +36,12 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.util.db.AvroUtil.CodecType;
import org.apache.nifi.util.db.JdbcCommon;
import java.util.List;
import java.util.Set;
-import static org.apache.nifi.util.db.AvroUtil.CodecType;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
import static org.apache.nifi.util.db.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
@@ -150,7 +150,7 @@ public class ExecuteSQL extends AbstractExecuteSQL {
propDescriptors = List.of(
DBCP_SERVICE,
SQL_PRE_QUERY,
- SQL_SELECT_QUERY,
+ SQL_QUERY,
SQL_POST_QUERY,
QUERY_TIMEOUT,
NORMALIZE_NAMES_FOR_AVRO,
@@ -161,7 +161,8 @@ public class ExecuteSQL extends AbstractExecuteSQL {
MAX_ROWS_PER_FLOW_FILE,
OUTPUT_BATCH_SIZE,
FETCH_SIZE,
- AUTO_COMMIT
+ AUTO_COMMIT,
+ CONTENT_OUTPUT_STRATEGY
);
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 88f0b9f761..0ac5234773 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -153,7 +153,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
DBCP_SERVICE,
SQL_PRE_QUERY,
- SQL_SELECT_QUERY,
+ SQL_QUERY,
SQL_POST_QUERY,
QUERY_TIMEOUT,
RECORD_WRITER_FACTORY,
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 2ee9c1ddea..81499c568c 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -55,6 +55,7 @@ import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@@ -117,7 +118,7 @@ public class TestExecuteSQL {
@Test
public void testIncomingConnectionWithNoFlowFile() {
runner.setIncomingConnection(true);
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM
persons");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM persons");
runner.run();
runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0);
@@ -203,7 +204,7 @@ public class TestExecuteSQL {
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1,
1)");
runner.setIncomingConnection(false);
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM
TEST_NULL_INT");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
@@ -211,6 +212,71 @@ public class TestExecuteSQL {
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).getFirst().assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX,
"0");
}
+ @Test
+ public void testDropTableWithOverwrite() throws SQLException, IOException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_DROP_TABLE");
+ } catch (final SQLException ignored) {
+ }
+
+ stmt.execute("create table TEST_DROP_TABLE (id integer not null, val1
integer, val2 integer)");
+
+ stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (0,
NULL, 1)");
+ stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (1,
1, 1)");
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "DROP TABLE TEST_DROP_TABLE");
+ runner.enqueue("some data");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+
+ final List<MockFlowFile> flowfiles =
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
+ final InputStream in = new
ByteArrayInputStream(flowfiles.getFirst().toByteArray());
+ final DatumReader<GenericRecord> datumReader = new
GenericDatumReader<>();
+ try (DataFileStream<GenericRecord> dataFileReader = new
DataFileStream<>(in, datumReader)) {
+ assertFalse(dataFileReader.hasNext());
+ }
+ }
+
+ @Test
+ public void testDropTableNoOverwrite() throws SQLException, IOException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_TRUNCATE_TABLE");
+ } catch (final SQLException ignored) {
+ }
+
+ stmt.execute("create table TEST_TRUNCATE_TABLE (id integer not null,
val1 integer, val2 integer)");
+
+ stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES
(0, NULL, 1)");
+ stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES
(1, 1, 1)");
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(ExecuteSQL.CONTENT_OUTPUT_STRATEGY,
AbstractExecuteSQL.ContentOutputStrategy.ORIGINAL);
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "TRUNCATE TABLE
TEST_TRUNCATE_TABLE");
+ runner.enqueue("some data");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+ runner.assertContents(ExecuteSQL.REL_SUCCESS, List.of("some data"));
+ }
+
@Test
public void testCompression() throws SQLException, IOException {
// remove previous test database, if any
@@ -233,7 +299,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.COMPRESSION_FORMAT,
AvroUtil.CodecType.BZIP2.name());
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM
TEST_NULL_INT");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
@@ -269,7 +335,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "5");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM
TEST_NULL_INT");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
@@ -311,7 +377,6 @@ public class TestExecuteSQL {
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES ("
+ i + ", 1, 1)");
}
-
Map<String, String> attrMap = new HashMap<>();
String testAttrName = "attr1";
String testAttrValue = "value1";
@@ -342,8 +407,6 @@ public class TestExecuteSQL {
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(),
"199");
lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue);
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID,
inputFlowFile.getAttribute(CoreAttributes.UUID.key()));
-
-
}
@Test
@@ -371,7 +434,7 @@ public class TestExecuteSQL {
runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(AbstractExecuteSQL.FETCH_SIZE, "5");
runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM
TEST_NULL_INT");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
@@ -410,7 +473,7 @@ public class TestExecuteSQL {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(false);
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "insert into
TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "insert into TEST_NULL_INT
(id, val1, val2) VALUES (0, NULL, 1)");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
@@ -435,7 +498,7 @@ public class TestExecuteSQL {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(true);
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from
TEST_NULL_INT");
runner.enqueue("Hello".getBytes());
runner.run();
@@ -481,7 +544,7 @@ public class TestExecuteSQL {
stmt.execute("insert into host2 values(1,'host2')");
stmt.execute("select a.host as hostA,b.host as hostB from host1 a join
host2 b on b.id=a.id");
runner.setIncomingConnection(false);
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select a.host as
hostA,b.host as hostB from host1 a join host2 b on b.id=a.id");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "select a.host as
hostA,b.host as hostB from host1 a join host2 b on b.id=a.id");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
@@ -507,7 +570,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(false);
// Try a valid SQL statement that will generate an error (val1 does
not exist, e.g.)
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM
TEST_NO_ROWS");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT val1 FROM
TEST_NO_ROWS");
runner.run();
//No incoming flow file containing a query, and an exception causes no
outbound flowfile.
@@ -584,7 +647,7 @@ public class TestExecuteSQL {
}
if (setQueryProperty) {
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
+ runner.setProperty(ExecuteSQL.SQL_QUERY, query);
}
runner.run();
@@ -639,7 +702,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from
TEST_NULL_INT");
runner.enqueue("test".getBytes());
runner.run();
@@ -684,7 +747,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from
TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
runner.enqueue("test".getBytes());
runner.run();
@@ -730,7 +793,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(true);
// Simulate failure by not provide parameter
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from
TEST_NULL_INT");
runner.enqueue("test".getBytes());
runner.run();
@@ -756,7 +819,7 @@ public class TestExecuteSQL {
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from
TEST_NULL_INT");
// Simulate failure by not provide parameter
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
runner.enqueue("test".getBytes());
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 1f0fe16e69..ba5edac95f 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -124,7 +124,7 @@ public class TestExecuteSQLRecord {
@Test
public void testIncomingConnectionWithNoFlowFile() throws
InitializationException {
runner.setIncomingConnection(true);
- runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM
persons");
+ runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "SELECT * FROM
persons");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -209,7 +209,7 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "5");
- runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "SELECT * FROM
TEST_NULL_INT");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS,
200);
@@ -354,7 +354,7 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(false);
runner.setProperty(AbstractExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(AbstractExecuteSQL.OUTPUT_BATCH_SIZE, "0");
- runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "SELECT * FROM
TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -402,7 +402,7 @@ public class TestExecuteSQLRecord {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(false);
- runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "insert into
TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+ runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "insert into
TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -442,7 +442,7 @@ public class TestExecuteSQLRecord {
+ "CAST ('Hello World' AS CLOB), CAST ('I am an NCLOB' AS
NCLOB))");
runner.setIncomingConnection(false);
- runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "select * from
TEST_NULL_INT");
AvroRecordSetWriter recordWriter = new AvroRecordSetWriter();
runner.addControllerService("writer", recordWriter);
runner.setProperty(recordWriter,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
@@ -455,25 +455,26 @@ public class TestExecuteSQLRecord {
flowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT,
"1");
ByteArrayInputStream bais = new
ByteArrayInputStream(flowFile.toByteArray());
- final DataFileStream<GenericRecord> dataFileStream = new
DataFileStream<>(bais, new GenericDatumReader<>());
- final Schema avroSchema = dataFileStream.getSchema();
- GenericData.setStringType(avroSchema, GenericData.StringType.String);
- final GenericRecord avroRecord = dataFileStream.next();
-
- Object imageObj = avroRecord.get("IMAGE");
- assertNotNull(imageObj);
- assertInstanceOf(ByteBuffer.class, imageObj);
- assertArrayEquals(new byte[]{(byte) 0xDE, (byte) 0xAD, (byte) 0xBE,
(byte) 0xEF}, ((ByteBuffer) imageObj).array());
-
- Object wordsObj = avroRecord.get("WORDS");
- assertNotNull(wordsObj);
- assertInstanceOf(Utf8.class, wordsObj);
- assertEquals("Hello World", wordsObj.toString());
-
- Object natwordsObj = avroRecord.get("NATWORDS");
- assertNotNull(natwordsObj);
- assertInstanceOf(Utf8.class, natwordsObj);
- assertEquals("I am an NCLOB", natwordsObj.toString());
+ try (DataFileStream<GenericRecord> dataFileStream = new
DataFileStream<>(bais, new GenericDatumReader<>())) {
+ final Schema avroSchema = dataFileStream.getSchema();
+ GenericData.setStringType(avroSchema,
GenericData.StringType.String);
+ final GenericRecord avroRecord = dataFileStream.next();
+
+ Object imageObj = avroRecord.get("IMAGE");
+ assertNotNull(imageObj);
+ assertInstanceOf(ByteBuffer.class, imageObj);
+ assertArrayEquals(new byte[] {(byte) 0xDE, (byte) 0xAD, (byte)
0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array());
+
+ Object wordsObj = avroRecord.get("WORDS");
+ assertNotNull(wordsObj);
+ assertInstanceOf(Utf8.class, wordsObj);
+ assertEquals("Hello World", wordsObj.toString());
+
+ Object natwordsObj = avroRecord.get("NATWORDS");
+ assertNotNull(natwordsObj);
+ assertInstanceOf(Utf8.class, natwordsObj);
+ assertEquals("I am an NCLOB", natwordsObj.toString());
+ }
}
@Test
@@ -494,7 +495,39 @@ public class TestExecuteSQLRecord {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(true);
- runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from
TEST_NULL_INT");
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ runner.enqueue("Hello".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+ MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+ firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT,
"0");
+ firstFlowFile.assertContentEquals("");
+ }
+
+ @Test
+ public void testNoResultCreatesEmptyFlowFile() throws Exception {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException ignored) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "drop table
TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -527,7 +560,7 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(false);
// Try a valid SQL statement that will generate an error (val1 does
not exist, e.g.)
- runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT val1
FROM TEST_NO_ROWS");
+ runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "SELECT val1 FROM
TEST_NO_ROWS");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -581,7 +614,7 @@ public class TestExecuteSQLRecord {
}
if (setQueryProperty) {
- runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, query);
+ runner.setProperty(AbstractExecuteSQL.SQL_QUERY, query);
}
runner.run();
@@ -654,7 +687,7 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from
TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -687,7 +720,7 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from
TEST_NULL_INT");
runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
@@ -721,7 +754,7 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(true);
// Simulate failure by not provide parameter
runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
- runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from
TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -751,7 +784,7 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "select * from
TEST_NULL_INT");
// Simulate failure by not provide parameter
runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);