This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch NIFI-14266 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 8e52d9c53d1b556482d278bb61227a971972625b Author: Pierre Villard <[email protected]> AuthorDate: Thu Feb 13 20:34:55 2025 +0100 NIFI-14266 - Allow ExecuteSQL to not overwrite flowfile content when no result --- .../processors/standard/AbstractExecuteSQL.java | 20 +++++- .../nifi/processors/standard/ExecuteSQL.java | 5 +- .../nifi/processors/standard/TestExecuteSQL.java | 69 ++++++++++++++++++++- .../processors/standard/TestExecuteSQLRecord.java | 71 ++++++++++++++++------ 4 files changed, 139 insertions(+), 26 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index 8ac07ace50..245180e7e6 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -103,7 +103,8 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() .name("SQL select query") - .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes " + .displayName("SQL Query") + .description("The SQL query to execute. The query can be empty, a constant value, or built from attributes " + "using Expression Language. If this property is specified, it will be used regardless of the content of " + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected " + "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression " @@ -188,6 +189,17 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { .required(true) .build(); + public static final PropertyDescriptor OVERWRITE_FLOW_FILE_CONTENT = new PropertyDescriptor.Builder() + .name("Overwrite FlowFile Content") + .description(""" + If the query didn't return any result, this property specifies if the processor should overwrite the + FlowFile's content (when the processor is triggered by an incoming FlowFile). + """) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + protected List<PropertyDescriptor> propDescriptors; protected DBCPService dbcpService; @@ -456,7 +468,11 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { session.remove(fileToProcess); } else { // If we had no results then transfer the original flow file downstream to trigger processors - session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter), REL_SUCCESS); + if (context.getProperty(OVERWRITE_FLOW_FILE_CONTENT).asBoolean() == null || context.getProperty(OVERWRITE_FLOW_FILE_CONTENT).asBoolean()) { + session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter), REL_SUCCESS); + } else { + session.transfer(fileToProcess, REL_SUCCESS); + } } } else if (resultCount == 0) { // If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index fb806eaed3..f681f45ed4 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -36,12 +36,12 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter; import org.apache.nifi.processors.standard.sql.SqlWriter; +import org.apache.nifi.util.db.AvroUtil.CodecType; import org.apache.nifi.util.db.JdbcCommon; import java.util.List; import java.util.Set; -import static org.apache.nifi.util.db.AvroUtil.CodecType; import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION; import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE; import static org.apache.nifi.util.db.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO; @@ -161,7 +161,8 @@ public class ExecuteSQL extends AbstractExecuteSQL { MAX_ROWS_PER_FLOW_FILE, OUTPUT_BATCH_SIZE, FETCH_SIZE, - AUTO_COMMIT + AUTO_COMMIT, + OVERWRITE_FLOW_FILE_CONTENT ); } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 2ee9c1ddea..706d3b6f25 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -211,6 +212,71 @@ public class TestExecuteSQL { runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).getFirst().assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); } + @Test + public void testDropTableWithOverwrite() throws SQLException, IOException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_DROP_TABLE"); + } catch (final SQLException ignored) { + } + + stmt.execute("create table TEST_DROP_TABLE (id integer not null, val1 integer, val2 integer)"); + + stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "DROP TABLE TEST_DROP_TABLE"); + runner.enqueue("some data"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + + final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS); + final InputStream in = new ByteArrayInputStream(flowfiles.getFirst().toByteArray()); + final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) { + assertFalse(dataFileReader.hasNext()); + } + } + + @Test + public void testDropTableNoOverwrite() throws SQLException, IOException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TRUNCATE_TABLE"); + } catch (final SQLException ignored) { + } + + stmt.execute("create table TEST_TRUNCATE_TABLE (id integer not null, val1 integer, val2 integer)"); + + stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.OVERWRITE_FLOW_FILE_CONTENT, "false"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "TRUNCATE TABLE TEST_TRUNCATE_TABLE"); + runner.enqueue("some data"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + runner.assertContents(ExecuteSQL.REL_SUCCESS, List.of("some data")); + } + @Test public void testCompression() throws SQLException, IOException { // remove previous test database, if any @@ -311,7 +377,6 @@ public class TestExecuteSQL { stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)"); } - Map<String, String> attrMap = new HashMap<>(); String testAttrName = "attr1"; String testAttrValue = "value1"; @@ -342,8 +407,6 @@ public class TestExecuteSQL { lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199"); lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue); lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key())); - - } @Test diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index 1f0fe16e69..7ed2883dcb 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -455,25 +455,26 @@ public class TestExecuteSQLRecord { flowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "1"); ByteArrayInputStream bais = new ByteArrayInputStream(flowFile.toByteArray()); - final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(bais, new GenericDatumReader<>()); - final Schema avroSchema = dataFileStream.getSchema(); - GenericData.setStringType(avroSchema, GenericData.StringType.String); - final GenericRecord avroRecord = dataFileStream.next(); - - Object imageObj = avroRecord.get("IMAGE"); - assertNotNull(imageObj); - assertInstanceOf(ByteBuffer.class, imageObj); - assertArrayEquals(new byte[]{(byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array()); - - Object wordsObj = avroRecord.get("WORDS"); - assertNotNull(wordsObj); - assertInstanceOf(Utf8.class, wordsObj); - assertEquals("Hello World", wordsObj.toString()); - - Object natwordsObj = avroRecord.get("NATWORDS"); - assertNotNull(natwordsObj); - assertInstanceOf(Utf8.class, natwordsObj); - assertEquals("I am an NCLOB", natwordsObj.toString()); + try (DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(bais, new GenericDatumReader<>())) { + final Schema avroSchema = dataFileStream.getSchema(); + GenericData.setStringType(avroSchema, GenericData.StringType.String); + final GenericRecord avroRecord = dataFileStream.next(); + + Object imageObj = avroRecord.get("IMAGE"); + assertNotNull(imageObj); + assertInstanceOf(ByteBuffer.class, imageObj); + assertArrayEquals(new byte[] {(byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array()); + + Object wordsObj = avroRecord.get("WORDS"); + assertNotNull(wordsObj); + assertInstanceOf(Utf8.class, wordsObj); + assertEquals("Hello World", wordsObj.toString()); + + Object natwordsObj = avroRecord.get("NATWORDS"); + assertNotNull(natwordsObj); + assertInstanceOf(Utf8.class, natwordsObj); + assertEquals("I am an NCLOB", natwordsObj.toString()); + } } @Test @@ -508,6 +509,38 @@ public class TestExecuteSQLRecord { firstFlowFile.assertContentEquals(""); } + @Test + public void testNoResultCreatesEmptyFlowFile() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException ignored) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "drop table TEST_NULL_INT"); + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + runner.enqueue("Hello".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0); + firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "0"); + firstFlowFile.assertContentEquals(""); + } + @Test public void testWithSqlException() throws Exception { // remove previous test database, if any
